flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [6/6] flink git commit: [FLINK-4192] [metrics] Move metrics classes out of 'flink-core'
Date Tue, 26 Jul 2016 10:31:05 GMT
[FLINK-4192] [metrics] Move metrics classes out of 'flink-core'

- moved user-facing API to 'flink-metrics/flink-metrics-core'
- moved JMXReporter to 'flink-metrics/flink-metrics-jmx'
- moved remaining metric classes to 'flink-runtime'

This closes #2226


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3fec1f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3fec1f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3fec1f9

Branch: refs/heads/master
Commit: e3fec1f9ad737c01fc5c22bd0c327bfce91938a9
Parents: e4fe89d
Author: zentol <chesnay@apache.org>
Authored: Fri Jul 22 12:33:16 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Tue Jul 26 12:30:15 2016 +0200

----------------------------------------------------------------------
 docs/apis/metrics.md                            |   4 +-
 flink-core/pom.xml                              |   6 +
 .../apache/flink/metrics/CharacterFilter.java   |  36 --
 .../java/org/apache/flink/metrics/Counter.java  |  59 ---
 .../java/org/apache/flink/metrics/Gauge.java    |  34 --
 .../org/apache/flink/metrics/Histogram.java     |  52 --
 .../flink/metrics/HistogramStatistics.java      |  81 ---
 .../java/org/apache/flink/metrics/Metric.java   |  28 --
 .../org/apache/flink/metrics/MetricGroup.java   | 133 -----
 .../apache/flink/metrics/MetricRegistry.java    | 273 -----------
 .../org/apache/flink/metrics/SimpleCounter.java |  71 ---
 .../metrics/groups/AbstractMetricGroup.java     | 292 -----------
 .../metrics/groups/ComponentMetricGroup.java    |  78 ---
 .../metrics/groups/GenericMetricGroup.java      |  49 --
 .../flink/metrics/groups/IOMetricGroup.java     |  52 --
 .../groups/JobManagerJobMetricGroup.java        |  69 ---
 .../metrics/groups/JobManagerMetricGroup.java   | 104 ----
 .../flink/metrics/groups/JobMetricGroup.java    |  62 ---
 .../metrics/groups/OperatorMetricGroup.java     |  62 ---
 .../flink/metrics/groups/ProxyMetricGroup.java  |  90 ----
 .../groups/TaskManagerJobMetricGroup.java       | 122 -----
 .../metrics/groups/TaskManagerMetricGroup.java  | 134 -----
 .../flink/metrics/groups/TaskMetricGroup.java   | 169 -------
 .../groups/UnregisteredMetricsGroup.java        |  84 ----
 .../flink/metrics/groups/scope/ScopeFormat.java | 490 ------------------
 .../metrics/groups/scope/ScopeFormats.java      | 130 -----
 .../metrics/reporter/AbstractReporter.java      |  78 ---
 .../flink/metrics/reporter/JMXReporter.java     | 491 -------------------
 .../flink/metrics/reporter/MetricReporter.java  |  75 ---
 .../flink/metrics/reporter/Scheduled.java       |  34 --
 .../flink/metrics/MetricRegistryTest.java       | 194 --------
 .../metrics/groups/JobManagerGroupTest.java     | 108 ----
 .../metrics/groups/JobManagerJobGroupTest.java  |  90 ----
 .../groups/MetricGroupRegistrationTest.java     | 114 -----
 .../flink/metrics/groups/MetricGroupTest.java   | 149 ------
 .../flink/metrics/groups/OperatorGroupTest.java |  53 --
 .../metrics/groups/TaskManagerGroupTest.java    | 154 ------
 .../metrics/groups/TaskManagerJobGroupTest.java |  94 ----
 .../metrics/groups/TaskMetricGroupTest.java     | 167 -------
 .../flink/metrics/reporter/JMXReporterTest.java | 291 -----------
 .../apache/flink/metrics/util/TestReporter.java |  44 --
 flink-dist/pom.xml                              |   7 +
 flink-metrics/flink-metrics-core/pom.xml        |  35 ++
 .../apache/flink/metrics/CharacterFilter.java   |  36 ++
 .../java/org/apache/flink/metrics/Counter.java  |  56 +++
 .../java/org/apache/flink/metrics/Gauge.java    |  32 ++
 .../org/apache/flink/metrics/Histogram.java     |  49 ++
 .../flink/metrics/HistogramStatistics.java      |  78 +++
 .../java/org/apache/flink/metrics/Metric.java   |  25 +
 .../org/apache/flink/metrics/MetricConfig.java  |  62 +++
 .../org/apache/flink/metrics/MetricGroup.java   | 162 ++++++
 .../org/apache/flink/metrics/SimpleCounter.java |  73 +++
 .../groups/UnregisteredMetricsGroup.java        |  98 ++++
 .../metrics/reporter/AbstractReporter.java      |  76 +++
 .../flink/metrics/reporter/MetricReporter.java  |  73 +++
 .../flink/metrics/reporter/Scheduled.java       |  31 ++
 flink-metrics/flink-metrics-dropwizard/pom.xml  |  16 +-
 .../dropwizard/ScheduledDropwizardReporter.java |  12 +-
 .../ScheduledDropwizardReporterTest.java        |  17 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |   7 +-
 flink-metrics/flink-metrics-ganglia/pom.xml     |   2 +-
 .../flink/metrics/ganglia/GangliaReporter.java  |   4 +-
 flink-metrics/flink-metrics-graphite/pom.xml    |   2 +-
 .../metrics/graphite/GraphiteReporter.java      |   4 +-
 flink-metrics/flink-metrics-jmx/pom.xml         |  81 +++
 .../apache/flink/metrics/jmx/JMXReporter.java   | 491 +++++++++++++++++++
 .../flink/metrics/jmx/JMXReporterTest.java      | 289 +++++++++++
 .../jobmanager/JMXJobManagerMetricTest.java     | 119 +++++
 .../src/test/resources/log4j-test.properties    |  27 +
 .../src/test/resources/logback-test.xml         |  34 ++
 flink-metrics/flink-metrics-statsd/pom.xml      |   9 +-
 .../flink/metrics/statsd/StatsDReporter.java    |   4 +-
 .../metrics/statsd/StatsDReporterTest.java      |  11 +-
 flink-metrics/pom.xml                           |   2 +
 .../flink/runtime/execution/Environment.java    |   2 +-
 .../api/serialization/RecordSerializer.java     |   2 +-
 .../serialization/SpanningRecordSerializer.java |   2 +-
 .../io/network/api/writer/RecordWriter.java     |   2 +-
 .../partition/consumer/LocalInputChannel.java   |   2 +-
 .../partition/consumer/RemoteInputChannel.java  |   2 +-
 .../partition/consumer/SingleInputGate.java     |   2 +-
 .../partition/consumer/UnknownInputChannel.java |   2 +-
 .../flink/runtime/metrics/MetricRegistry.java   | 269 ++++++++++
 .../metrics/groups/AbstractMetricGroup.java     | 292 +++++++++++
 .../metrics/groups/ComponentMetricGroup.java    |  78 +++
 .../metrics/groups/GenericMetricGroup.java      |  47 ++
 .../runtime/metrics/groups/IOMetricGroup.java   |  52 ++
 .../groups/JobManagerJobMetricGroup.java        |  71 +++
 .../metrics/groups/JobManagerMetricGroup.java   | 103 ++++
 .../runtime/metrics/groups/JobMetricGroup.java  |  62 +++
 .../metrics/groups/OperatorMetricGroup.java     |  64 +++
 .../metrics/groups/ProxyMetricGroup.java        | 106 ++++
 .../groups/TaskManagerJobMetricGroup.java       | 125 +++++
 .../metrics/groups/TaskManagerMetricGroup.java  | 132 +++++
 .../runtime/metrics/groups/TaskMetricGroup.java | 170 +++++++
 .../metrics/scope/JobManagerJobScopeFormat.java |  46 ++
 .../metrics/scope/JobManagerScopeFormat.java    |  37 ++
 .../metrics/scope/OperatorScopeFormat.java      |  60 +++
 .../runtime/metrics/scope/ScopeFormat.java      | 307 ++++++++++++
 .../runtime/metrics/scope/ScopeFormats.java     | 153 ++++++
 .../scope/TaskManagerJobScopeFormat.java        |  48 ++
 .../metrics/scope/TaskManagerScopeFormat.java   |  38 ++
 .../runtime/metrics/scope/TaskScopeFormat.java  |  62 +++
 .../runtime/taskmanager/RuntimeEnvironment.java |   2 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   8 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  11 +-
 .../testingUtils/TestingJobManager.scala        |   2 +-
 .../ExecutionGraphMetricsTest.java              |  12 +-
 .../jobmanager/JobManagerMetricTest.java        | 118 -----
 .../runtime/metrics/MetricRegistryTest.java     | 197 ++++++++
 .../metrics/groups/JobManagerGroupTest.java     | 109 ++++
 .../metrics/groups/JobManagerJobGroupTest.java  |  90 ++++
 .../groups/MetricGroupRegistrationTest.java     | 114 +++++
 .../runtime/metrics/groups/MetricGroupTest.java | 149 ++++++
 .../metrics/groups/OperatorGroupTest.java       |  53 ++
 .../metrics/groups/TaskManagerGroupTest.java    | 271 ++++++++++
 .../metrics/groups/TaskManagerJobGroupTest.java |  94 ++++
 .../metrics/groups/TaskMetricGroupTest.java     | 168 +++++++
 .../runtime/metrics/util/TestReporter.java      |  44 ++
 .../operators/testutils/DummyEnvironment.java   |   2 +-
 .../operators/testutils/MockEnvironment.java    |   4 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |  10 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |   3 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   2 +-
 .../flink-connector-kafka-0.8/pom.xml           |   7 +
 .../flink-connector-kafka-0.9/pom.xml           |   7 +
 .../flink-connector-kafka-base/pom.xml          |   7 +
 .../connectors/kafka/KafkaTestBase.java         |   3 +-
 .../runtime/io/StreamInputProcessor.java        |   2 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   2 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   2 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   2 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   2 +-
 136 files changed, 5677 insertions(+), 5088 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/docs/apis/metrics.md
----------------------------------------------------------------------
diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md
index 75e7a52..e8c2772 100644
--- a/docs/apis/metrics.md
+++ b/docs/apis/metrics.md
@@ -230,7 +230,7 @@ or by assigning unique names to jobs and operators.
 Metrics can be exposed to an external system by configuring a reporter in `conf/flink-conf.yaml`.
 
 - `metrics.reporter.class`: The class of the reporter to use.
-  - Example: org.apache.flink.metrics.reporter.JMXReporter
+  - Example: org.apache.flink.metrics.jmx.JMXReporter
 - `metrics.reporter.arguments`: A list of named parameters that are passed to the reporter.
   - Example: --host localhost --port 9010
 - `metrics.reporter.interval`: The interval between reports.
@@ -241,7 +241,7 @@ If the Reporter should send out reports regularly you have to implement the `Sch
 
 The following sections list the supported reporters.
 
-### JMX (org.apache.flink.metrics.reporter.JMXReporter)
+### JMX (org.apache.flink.metrics.jmx.JMXReporter)
 
 You don't have to include an additional dependency since the JMX reporter is available by default
 but not activated.

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 21d00b3..5496f8c 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -42,6 +42,12 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
 			<groupId>com.esotericsoftware.kryo</groupId>
 			<artifactId>kryo</artifactId>
 			<!-- managed version -->

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java b/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
deleted file mode 100644
index 1e9fbc4..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-/**
- * Interface for a character filter function. The filter function is given a string which the filter
- * can transform. The returned string is the transformation result.
- */
-public interface CharacterFilter {
-
-	/**
-	 * Filter the given string and generate a resulting string from it.
-	 *
-	 * For example, one implementation could filter out invalid characters from the input string.
-	 *
-	 * @param input Input string
-	 * @return Filtered result string
-	 */
-	String filterCharacters(String input);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
deleted file mode 100644
index ffb1cc7..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * A Counter is a {@link Metric} that measures a count.
- */
-@PublicEvolving
-public interface Counter extends Metric {
-
-	/**
-	 * Increment the current count by 1.
-	 */
-	void inc();
-
-	/**
-	 * Increment the current count by the given value.
-	 *
-	 * @param n value to increment the current count by
-	 */
-	void inc(long n);
-
-	/**
-	 * Decrement the current count by 1.
-	 */
-	void dec();
-
-	/**
-	 * Decrement the current count by the given value.
-	 *
-	 * @param n value to decrement the current count by
-	 */
-	void dec(long n);
-
-	/**
-	 * Returns the current count.
-	 *
-	 * @return current count
-	 */
-	long getCount();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
deleted file mode 100644
index 740645d..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * A Gauge is a {@link Metric} that calculates a specific value at a point in time.
- */
-@PublicEvolving
-public interface Gauge<T> extends Metric {
-	/**
-	 * Calculates and returns the measured value.
-	 *
-	 * @return calculated value
-	 */
-	T getValue();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
deleted file mode 100644
index 3fd1253..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * Histogram interface to be used with Flink's metrics system.
- *
- * The histogram allows to record values, get the current count of recorded values and create
- * histogram statistics for the currently seen elements.
- */
-@PublicEvolving
-public interface Histogram extends Metric {
-
-	/**
-	 * Update the histogram with the given value.
-	 *
-	 * @param value Value to update the histogram with
-	 */
-	void update(long value);
-
-	/**
-	 * Get the count of seen elements.
-	 *
-	 * @return Count of seen elements
-	 */
-	long getCount();
-
-	/**
-	 * Create statistics for the currently recorded elements.
-	 *
-	 * @return Statistics about the currently recorded elements
-	 */
-	HistogramStatistics getStatistics();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
deleted file mode 100644
index 476580c..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * Histogram statistics represent the current snapshot of elements recorded in the histogram.
- *
- * The histogram statistics allow to calculate values for quantiles, the mean, the standard
- * deviation, the minimum and the maximum.
- */
-@PublicEvolving
-public abstract class HistogramStatistics {
-
-	/**
-	 * Returns the value for the given quantile based on the represented histogram statistics.
-	 *
-	 * @param quantile Quantile to calculate the value for
-	 * @return Value for the given quantile
-	 */
-	public abstract double getQuantile(double quantile);
-
-	/**
-	 * Returns the elements of the statistics' sample
-	 *
-	 * @return Elements of the statistics' sample
-	 */
-	public abstract long[] getValues();
-
-	/**
-	 * Returns the size of the statistics' sample
-	 *
-	 * @return Size of the statistics' sample
-	 */
-	public abstract int size();
-
-	/**
-	 * Returns the mean of the histogram values.
-	 *
-	 * @return Mean of the histogram values
-	 */
-	public abstract double getMean();
-
-	/**
-	 * Returns the standard deviation of the distribution reflected by the histogram statistics.
-	 *
-	 * @return Standard deviation of histogram distribution
-	 */
-	public abstract double getStdDev();
-
-	/**
-	 * Returns the maximum value of the histogram.
-	 *
-	 * @return Maximum value of the histogram
-	 */
-	public abstract long getMax();
-
-	/**
-	 * Returns the minimum value of the histogram.
-	 *
-	 * @return Minimum value of the histogram
-	 */
-	public abstract long getMin();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
deleted file mode 100644
index 8054de0..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * Common super interface for all metrics.
- */
-@PublicEvolving
-public interface Metric {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
deleted file mode 100644
index b578cb3..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups.
- * 
- * <p>Instances of this class can be used to register new metrics with Flink and to create a nested
- * hierarchy based on the group names.
- * 
- * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
- */
-@PublicEvolving
-public interface MetricGroup {
-
-	// ------------------------------------------------------------------------
-	//  Metrics
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
-	 *
-	 * @param name name of the counter
-	 * @return the created counter
-	 */
-	Counter counter(int name);
-
-	/**
-	 * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
-	 *
-	 * @param name name of the counter
-	 * @return the created counter
-	 */
-	Counter counter(String name);
-
-	/**
-	 * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
-	 *
-	 * @param name    name of the counter
-	 * @param counter counter to register
-	 * @param <C>     counter type
-	 * @return the given counter
-	 */
-	<C extends Counter> C counter(int name, C counter);
-
-	/**
-	 * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
-	 *
-	 * @param name    name of the counter
-	 * @param counter counter to register
-	 * @param <C>     counter type
-	 * @return the given counter
-	 */
-	<C extends Counter> C counter(String name, C counter);
-	
-	/**
-	 * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
-	 *
-	 * @param name  name of the gauge
-	 * @param gauge gauge to register
-	 * @param <T>   return type of the gauge
-	 * @return the given gauge
-	 */
-	<T, G extends Gauge<T>> G gauge(int name, G gauge);
-
-	/**
-	 * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
-	 *
-	 * @param name  name of the gauge
-	 * @param gauge gauge to register
-	 * @param <T>   return type of the gauge
-	 * @return the given gauge
-	 */
-	<T, G extends Gauge<T>> G gauge(String name, G gauge);
-
-	/**
-	 * Registers a new {@link Histogram} with Flink.
-	 *
-	 * @param name name of the histogram
-	 * @param histogram histogram to register
-	 * @param <H> histogram type   
-	 * @return the registered histogram
-	 */
-	<H extends Histogram> H histogram(String name, H histogram);
-
-	/**
-	 * Registers a new {@link Histogram} with Flink.
-	 *
-	 * @param name name of the histogram
-	 * @param histogram histogram to register
-	 * @param <H> histogram type   
-	 * @return the registered histogram
-	 */
-	<H extends Histogram> H histogram(int name, H histogram);
-
-	// ------------------------------------------------------------------------
-	// Groups
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new MetricGroup and adds it to this groups sub-groups.
-	 *
-	 * @param name name of the group
-	 * @return the created group
-	 */
-	MetricGroup addGroup(int name);
-
-	/**
-	 * Creates a new MetricGroup and adds it to this groups sub-groups.
-	 *
-	 * @param name name of the group
-	 * @return the created group
-	 */
-	MetricGroup addGroup(String name);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
deleted file mode 100644
index 274821e..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.groups.scope.ScopeFormat;
-import org.apache.flink.metrics.groups.scope.ScopeFormats;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.TimerTask;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
- * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
- */
-@Internal
-public class MetricRegistry {
-	static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-	
-	private final MetricReporter reporter;
-	private final ScheduledExecutorService executor;
-
-	private final ScopeFormats scopeFormats;
-
-	private final char delimiter;
-
-	/**
-	 * Creates a new MetricRegistry and starts the configured reporter.
-	 */
-	public MetricRegistry(Configuration config) {
-		// first parse the scope formats, these are needed for all reporters
-		ScopeFormats scopeFormats;
-		try {
-			scopeFormats = createScopeConfig(config);
-		}
-		catch (Exception e) {
-			LOG.warn("Failed to parse scope format, using default scope formats", e);
-			scopeFormats = new ScopeFormats();
-		}
-		this.scopeFormats = scopeFormats;
-
-		char delim;
-		try {
-			delim = config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
-		} catch (Exception e) {
-			LOG.warn("Failed to parse delimiter, using default delimiter.", e);
-			delim = '.';
-		}
-		this.delimiter = delim;
-
-		// second, instantiate any custom configured reporters
-		
-		final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
-		if (className == null) {
-			// by default, don't report anything
-			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
-			this.reporter = null;
-			this.executor = null;
-		}
-		else {
-			MetricReporter reporter;
-			ScheduledExecutorService executor = null;
-			try {
-				String configuredPeriod = config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
-				TimeUnit timeunit = TimeUnit.SECONDS;
-				long period = 10;
-				
-				if (configuredPeriod != null) {
-					try {
-						String[] interval = configuredPeriod.split(" ");
-						period = Long.parseLong(interval[0]);
-						timeunit = TimeUnit.valueOf(interval[1]);
-					}
-					catch (Exception e) {
-						LOG.error("Cannot parse report interval from config: " + configuredPeriod +
-								" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
-								"Using default reporting interval.");
-					}
-				}
-				
-				Configuration reporterConfig = createReporterConfig(config, timeunit, period);
-			
-				Class<?> reporterClass = Class.forName(className);
-				reporter = (MetricReporter) reporterClass.newInstance();
-				reporter.open(reporterConfig);
-
-				if (reporter instanceof Scheduled) {
-					executor = Executors.newSingleThreadScheduledExecutor();
-					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
-					
-					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
-				}
-			}
-			catch (Throwable t) {
-				shutdownExecutor();
-				LOG.error("Could not instantiate metrics reporter. No metrics will be exposed/reported.", t);
-				reporter = null;
-			}
-
-			this.reporter = reporter;
-			this.executor = executor;
-		}
-	}
-
-	public char getDelimiter() {
-		return this.delimiter;
-	}
-
-	public MetricReporter getReporter() {
-		return reporter;
-	}
-
-	/**
-	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
-	 */
-	public void shutdown() {
-		if (reporter != null) {
-			try {
-				reporter.close();
-			} catch (Throwable t) {
-				LOG.warn("Metrics reporter did not shut down cleanly", t);
-			}
-		}
-		shutdownExecutor();
-	}
-	
-	private void shutdownExecutor() {
-		if (executor != null) {
-			executor.shutdown();
-
-			try {
-				if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-					executor.shutdownNow();
-				}
-			} catch (InterruptedException e) {
-				executor.shutdownNow();
-			}
-		}
-	}
-
-	public ScopeFormats getScopeFormats() {
-		return scopeFormats;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Metrics (de)registration
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Registers a new {@link Metric} with this registry.
-	 *
-	 * @param metric      the metric that was added
-	 * @param metricName  the name of the metric
-	 * @param group       the group that contains the metric
-	 */
-	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
-		try {
-			if (reporter != null) {
-				reporter.notifyOfAddedMetric(metric, metricName, group);
-			}
-		} catch (Exception e) {
-			LOG.error("Error while registering metric.", e);
-		}
-	}
-
-	/**
-	 * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry.
-	 *
-	 * @param metric      the metric that should be removed
-	 * @param metricName  the name of the metric
-	 * @param group       the group that contains the metric
-	 */
-	public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
-		try {
-			if (reporter != null) {
-				reporter.notifyOfRemovedMetric(metric, metricName, group);
-			}
-		} catch (Exception e) {
-			LOG.error("Error while registering metric.", e);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	static Configuration createReporterConfig(Configuration config, TimeUnit timeunit, long period) {
-		Configuration reporterConfig = new Configuration();
-		reporterConfig.setLong("period", period);
-		reporterConfig.setString("timeunit", timeunit.name());
-
-		String[] arguments = config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" ");
-		if (arguments.length > 1) {
-			for (int x = 0; x < arguments.length; x += 2) {
-				reporterConfig.setString(arguments[x].replace("--", ""), arguments[x + 1]);
-			}
-		}
-		return reporterConfig;
-	}
-
-	static ScopeFormats createScopeConfig(Configuration config) {
-		String jmFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-		String jmJobFormat = config.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-		String tmFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-		String tmJobFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-		String taskFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-		String operatorFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
-		
-		return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This task is explicitly a static class, so that it does not hold any references to the enclosing
-	 * MetricsRegistry instance.
-	 * 
-	 * This is a subtle difference, but very important: With this static class, the enclosing class instance
-	 * may become garbage-collectible, whereas with an anonymous inner class, the timer thread
-	 * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.
-	 * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible,
-	 * which acts as a fail-safe to stop the timer thread and prevents resource leaks.
-	 */
-	private static final class ReporterTask extends TimerTask {
-
-		private final Scheduled reporter;
-
-		private ReporterTask(Scheduled reporter) {
-			this.reporter = reporter;
-		}
-
-		@Override
-		public void run() {
-			try {
-				reporter.report();
-			} catch (Throwable t) {
-				LOG.warn("Error while reporting metrics", t);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
deleted file mode 100644
index 9720b08..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics;
-
-/**
- * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe.
- */
-public class SimpleCounter implements Counter {
-	private long count;
-
-	/**
-	 * Increment the current count by 1.
-	 */
-	@Override
-	public void inc() {
-		count++;
-	}
-
-	/**
-	 * Increment the current count by the given value.
-	 *
-	 * @param n value to increment the current count by
-	 */
-	@Override
-	public void inc(long n) {
-		count += n;
-	}
-
-	/**
-	 * Decrement the current count by 1.
-	 */
-	@Override
-	public void dec() {
-		count--;
-	}
-
-	/**
-	 * Decrement the current count by the given value.
-	 *
-	 * @param n value to decrement the current count by
-	 */
-	@Override
-	public void dec(long n) {
-		count -= n;
-	}
-
-	/**
-	 * Returns the current count.
-	 *
-	 * @return current count
-	 */
-	@Override
-	public long getCount() {
-		return count;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
deleted file mode 100644
index dda6e4d..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.SimpleCounter;
-
-import org.apache.flink.metrics.groups.scope.ScopeFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Abstract {@link MetricGroup} that contains key functionality for adding metrics and groups.
- * 
- * <p><b>IMPORTANT IMPLEMENTATION NOTE</b>
- * 
- * <p>This class uses locks for adding and removing metrics objects. This is done to
- * prevent resource leaks in the presence of concurrently closing a group and adding
- * metrics and subgroups.
- * Since closing groups recursively closes the subgroups, the lock acquisition order must
- * be strictly from parent group to subgroup. If at any point, a subgroup holds its group
- * lock and calls a parent method that also acquires the lock, it will create a deadlock
- * condition.
- *
- * <p>An AbstractMetricGroup can be {@link #close() closed}. Upon closing, the group de-register all metrics
- * from any metrics reporter and any internal maps. Note that even closed metrics groups
- * return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
- * These metrics simply do not get reported any more, when created on a closed group.
- */
-@Internal
-public abstract class AbstractMetricGroup implements MetricGroup {
-
-	/** shared logger */
-	private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
-
-	// ------------------------------------------------------------------------
-
-	/** The registry that this metrics group belongs to */
-	protected final MetricRegistry registry;
-
-	/** All metrics that are directly contained in this group */
-	private final Map<String, Metric> metrics = new HashMap<>();
-
-	/** All metric subgroups of this group */
-	private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
-
-	/** The metrics scope represented by this group.
-	 *  For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */
-	private final String[] scopeComponents;
-
-	/** The metrics scope represented by this group, as a concatenated string, lazily computed.
-	 * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
-	private String scopeString;
-
-	/** Flag indicating whether this group has been closed */
-	private volatile boolean closed;
-
-	// ------------------------------------------------------------------------
-
-	public AbstractMetricGroup(MetricRegistry registry, String[] scope) {
-		this.registry = checkNotNull(registry);
-		this.scopeComponents = checkNotNull(scope);
-	}
-
-	/**
-	 * Gets the scope as an array of the scope components, for example
-	 * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
-	 */
-	public String[] getScopeComponents() {
-		return scopeComponents;
-	}
-
-	/**
-	 * Returns the fully qualified metric name, for example
-	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
-	 * 
-	 * @param metricName metric name
-	 * @return fully qualified metric name
-     */
-	public String getMetricIdentifier(String metricName) {
-		return getMetricIdentifier(metricName, null);
-	}
-
-	/**
-	 * Returns the fully qualified metric name, for example
-	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
-	 *
-	 * @param metricName metric name
-	 * @param filter character filter which is applied to the scope components if not null.
-	 * @return fully qualified metric name
-	 */
-	public String getMetricIdentifier(String metricName, CharacterFilter filter) {
-		if (scopeString == null) {
-			if (filter != null) {
-				scopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents);
-			} else {
-				scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
-			}
-		}
-
-		if (filter != null) {
-			return scopeString + registry.getDelimiter() + filter.filterCharacters(metricName);
-		} else {
-			return scopeString + registry.getDelimiter() + metricName;
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Closing
-	// ------------------------------------------------------------------------
-
-	public void close() {
-		synchronized (this) {
-			if (!closed) {
-				closed = true;
-
-				// close all subgroups
-				for (AbstractMetricGroup group : groups.values()) {
-					group.close();
-				}
-				groups.clear();
-
-				// un-register all directly contained metrics
-				for (Map.Entry<String, Metric> metric : metrics.entrySet()) {
-					registry.unregister(metric.getValue(), metric.getKey(), this);
-				}
-				metrics.clear();
-			}
-		}
-	}
-
-	public final boolean isClosed() {
-		return closed;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//  Metrics
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public Counter counter(int name) {
-		return counter(String.valueOf(name));
-	}
-
-	@Override
-	public Counter counter(String name) {
-		return counter(name, new SimpleCounter());
-	}
-	
-	@Override
-	public <C extends Counter> C counter(int name, C counter) {
-		return counter(String.valueOf(name), counter);
-	}
-
-	@Override
-	public <C extends Counter> C counter(String name, C counter) {
-		addMetric(name, counter);
-		return counter;
-	}
-
-	@Override
-	public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
-		return gauge(String.valueOf(name), gauge);
-	}
-
-	@Override
-	public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
-		addMetric(name, gauge);
-		return gauge;
-	}
-
-	@Override
-	public <H extends Histogram> H histogram(int name, H histogram) {
-		return histogram(String.valueOf(name), histogram);
-	}
-
-	@Override
-	public <H extends Histogram> H histogram(String name, H histogram) {
-		addMetric(name, histogram);
-		return histogram;
-	}
-
-	/**
-	 * Adds the given metric to the group and registers it at the registry, if the group
-	 * is not yet closed, and if no metric with the same name has been registered before.
-	 * 
-	 * @param name the name to register the metric under
-	 * @param metric the metric to register
-	 */
-	protected void addMetric(String name, Metric metric) {
-		// add the metric only if the group is still open
-		synchronized (this) {
-			if (!closed) {
-				// immediately put without a 'contains' check to optimize the common case (no collition)
-				// collisions are resolved later
-				Metric prior = metrics.put(name, metric);
-
-				// check for collisions with other metric names
-				if (prior == null) {
-					// no other metric with this name yet
-
-					if (groups.containsKey(name)) {
-						// we warn here, rather than failing, because metrics are tools that should not fail the
-						// program when used incorrectly
-						LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" +
-								name + "'. Metric might not get properly reported. (" + scopeString + ')');
-					}
-
-					registry.register(metric, name, this);
-				}
-				else {
-					// we had a collision. put back the original value
-					metrics.put(name, prior);
-					
-					// we warn here, rather than failing, because metrics are tools that should not fail the
-					// program when used incorrectly
-					LOG.warn("Name collision: Group already contains a Metric with the name '" +
-							name + "'. Metric will not be reported. (" + scopeString + ')');
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Groups
-	// ------------------------------------------------------------------------
-
-	@Override
-	public MetricGroup addGroup(int name) {
-		return addGroup(String.valueOf(name));
-	}
-
-	@Override
-	public MetricGroup addGroup(String name) {
-		synchronized (this) {
-			if (!closed) {
-				// adding a group with the same name as a metric creates problems in many reporters/dashboards
-				// we warn here, rather than failing, because metrics are tools that should not fail the
-				// program when used incorrectly
-				if (metrics.containsKey(name)) {
-					LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" +
-							name + "'. Metric might not get properly reported. (" + scopeString + ')');
-				}
-
-				AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name);
-				AbstractMetricGroup prior = groups.put(name, newGroup);
-				if (prior == null) {
-					// no prior group with that name
-					return newGroup;
-				} else {
-					// had a prior group with that name, add the prior group back
-					groups.put(name, prior);
-					return prior;
-				}
-			}
-			else {
-				// return a non-registered group that is immediately closed already
-				GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name);
-				closedGroup.close();
-				return closedGroup;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
deleted file mode 100644
index 518d940..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.MetricRegistry;
-
-/**
- * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g., 
- * TaskManager, Job, Task, Operator).
- * 
- * <p>Usually, the scope of metrics is simply the hierarchy of the containing groups. For example
- * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code "A"} would have a
- * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the Metric's scope.
- *
- * <p>Component groups, however, have configurable scopes. This allow users to include or exclude
- * certain identifiers from the scope. The scope for metrics belonging to the "Task"
- * group could for example include the task attempt number (more fine grained identification), or
- * exclude it (for continuity of the namespace across failure and recovery).
- */
-@Internal
-public abstract class ComponentMetricGroup extends AbstractMetricGroup {
-
-	/**
-	 * Creates a new ComponentMetricGroup.
-	 *
-	 * @param registry     registry to register new metrics with
-	 * @param scope        the scope of the group
-	 */
-	public ComponentMetricGroup(
-			MetricRegistry registry,
-			String[] scope) {
-
-		super(registry, scope);
-	}
-
-	/**
-	 * Closes the component group by removing and closing all metrics and subgroups
-	 * (inherited from {@link AbstractMetricGroup}), plus closing and removing all dedicated
-	 * component subgroups.
-	 */
-	@Override
-	public void close() {
-		synchronized (this) {
-			if (!isClosed()) {
-				// remove all metrics and generic subgroups
-				super.close();
-
-				// remove and close all subcomponent metrics
-				for (ComponentMetricGroup group : subComponents()) {
-					group.close();
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  sub components
-	// ------------------------------------------------------------------------
-
-	protected abstract Iterable<? extends ComponentMetricGroup> subComponents();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
deleted file mode 100644
index ddcd73b..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.MetricRegistry;
-
-/**
- * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold
- * subgroups of metrics.
- */
-@Internal
-public class GenericMetricGroup extends AbstractMetricGroup {
-
-	public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
-		super(registry, makeScopeComponents(parent, name));
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static String[] makeScopeComponents(AbstractMetricGroup parent, String name) {
-		if (parent != null) {
-			String[] parentComponents = parent.getScopeComponents();
-			if (parentComponents != null && parentComponents.length > 0) {
-				String[] parts = new String[parentComponents.length + 1];
-				System.arraycopy(parentComponents, 0, parts, 0, parentComponents.length);
-				parts[parts.length - 1] = name;
-				return parts;
-			}
-		}
-		return new String[] { name };
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
deleted file mode 100644
index 90bc2a8..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.metrics.Counter;
-
-/**
- * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
- * forwarded to the parent task metric group.
- */
-public class IOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
-
-	private final Counter numBytesOut;
-	private final Counter numBytesInLocal;
-	private final Counter numBytesInRemote;
-
-	public IOMetricGroup(TaskMetricGroup parent) {
-		super(parent);
-
-		this.numBytesOut = counter("numBytesOut");
-		this.numBytesInLocal = counter("numBytesInLocal");
-		this.numBytesInRemote = counter("numBytesInRemote");
-	}
-
-	public Counter getBytesOutCounter() {
-		return numBytesOut;
-	}
-
-	public Counter getNumBytesInLocalCounter() {
-		return numBytesInLocal;
-	}
-
-	public Counter getNumBytesInRemoteCounter() {
-		return numBytesInRemote;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
deleted file mode 100644
index 1dd0439..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
- * a specific job, running on the JobManager.
- */
-@Internal
-public class JobManagerJobMetricGroup extends JobMetricGroup {
-
-	/** The metrics group that contains this group */
-	private final JobManagerMetricGroup parent;
-
-	public JobManagerJobMetricGroup(
-		MetricRegistry registry,
-		JobManagerMetricGroup parent,
-		JobID jobId,
-		@Nullable String jobName) {
-
-		this(registry, checkNotNull(parent), registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName);
-	}
-
-	public JobManagerJobMetricGroup(
-		MetricRegistry registry,
-		JobManagerMetricGroup parent,
-		JobManagerJobScopeFormat scopeFormat,
-		JobID jobId,
-		@Nullable String jobName) {
-
-		super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName));
-
-		this.parent = checkNotNull(parent);
-	}
-
-	public final JobManagerMetricGroup parent() {
-		return parent;
-	}
-
-	@Override
-	protected Iterable<? extends ComponentMetricGroup> subComponents() {
-		return Collections.emptyList();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
deleted file mode 100644
index 67e1117..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager.
- *
- * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do
- * not contain tasks any more
- */
-@Internal
-public class JobManagerMetricGroup extends ComponentMetricGroup {
-
-	private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>();
-
-	private final String hostname;
-
-	public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
-		this(registry, registry.getScopeFormats().getJobManagerFormat(), hostname);
-	}
-
-	public JobManagerMetricGroup(
-		MetricRegistry registry,
-		JobManagerScopeFormat scopeFormat,
-		String hostname) {
-
-		super(registry, scopeFormat.formatScope(hostname));
-		this.hostname = hostname;
-	}
-
-	public String hostname() {
-		return hostname;
-	}
-
-	// ------------------------------------------------------------------------
-	//  job groups
-	// ------------------------------------------------------------------------
-
-	public JobManagerJobMetricGroup addJob(
-		JobID jobId,
-		String jobName) {
-		// get or create a jobs metric group
-		JobManagerJobMetricGroup currentJobGroup;
-		synchronized (this) {
-			if (!isClosed()) {
-				currentJobGroup = jobs.get(jobId);
-
-				if (currentJobGroup == null || currentJobGroup.isClosed()) {
-					currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName);
-					jobs.put(jobId, currentJobGroup);
-				}
-				return currentJobGroup;
-			} else {
-				return null;
-			}
-		}
-	}
-
-	public void removeJob(JobID jobId) {
-		if (jobId == null) {
-			return;
-		}
-
-		synchronized (this) {
-			JobManagerJobMetricGroup containedGroup = jobs.remove(jobId);
-			if (containedGroup != null) {
-				containedGroup.close();
-			}
-		}
-	}
-
-	public int numRegisteredJobMetricGroups() {
-		return jobs.size();
-	}
-
-	@Override
-	protected Iterable<? extends ComponentMetricGroup> subComponents() {
-		return jobs.values();
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
deleted file mode 100644
index f7dfc78..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-
-import javax.annotation.Nullable;
-
-/**
- * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
- * a specific job.
- */
-@Internal
-public abstract class JobMetricGroup extends ComponentMetricGroup {
-
-	/** The ID of the job represented by this metrics group */
-	protected final JobID jobId;
-
-	/** The name of the job represented by this metrics group */
-	@Nullable
-	protected final String jobName;
-
-	// ------------------------------------------------------------------------
-
-	protected JobMetricGroup(
-			MetricRegistry registry,
-			JobID jobId,
-			@Nullable String jobName,
-			String[] scope) {
-		super(registry, scope);
-		
-		this.jobId = jobId;
-		this.jobName = jobName;
-	}
-
-	public JobID jobId() {
-		return jobId;
-	}
-
-	@Nullable
-	public String jobName() {
-		return jobName;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
deleted file mode 100644
index 6db79ab..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
-
-import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing an Operator.
- */
-@Internal
-public class OperatorMetricGroup extends ComponentMetricGroup {
-
-	/** The task metric group that contains this operator metric groups */
-	private final TaskMetricGroup parent;
-
-	public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) {
-		this(registry, parent, registry.getScopeFormats().getOperatorFormat(), operatorName);
-	}
-
-	public OperatorMetricGroup(
-			MetricRegistry registry,
-			TaskMetricGroup parent,
-			OperatorScopeFormat scopeFormat,
-			String operatorName) {
-
-		super(registry, scopeFormat.formatScope(parent, operatorName));
-		this.parent = checkNotNull(parent);
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public final TaskMetricGroup parent() {
-		return parent;
-	}
-	
-	@Override
-	protected Iterable<? extends ComponentMetricGroup> subComponents() {
-		return Collections.emptyList();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java
deleted file mode 100644
index 14ff367..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Metric group which forwards all registration calls to its parent metric group.
- *
- * @param <P> Type of the parent metric group
- */
-@Internal
-public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
-	private final P parentMetricGroup;
-
-	public ProxyMetricGroup(P parentMetricGroup) {
-		this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup);
-	}
-
-	@Override
-	public final Counter counter(int name) {
-		return parentMetricGroup.counter(name);
-	}
-
-	@Override
-	public final Counter counter(String name) {
-		return parentMetricGroup.counter(name);
-	}
-
-	@Override
-	public final <C extends Counter> C counter(int name, C counter) {
-		return parentMetricGroup.counter(name, counter);
-	}
-
-	@Override
-	public final <C extends Counter> C counter(String name, C counter) {
-		return parentMetricGroup.counter(name, counter);
-	}
-
-	@Override
-	public final <T, G extends Gauge<T>> G gauge(int name, G gauge) {
-		return parentMetricGroup.gauge(name, gauge);
-	}
-
-	@Override
-	public final <T, G extends Gauge<T>> G gauge(String name, G gauge) {
-		return parentMetricGroup.gauge(name, gauge);
-	}
-
-	@Override
-	public final <H extends Histogram> H histogram(String name, H histogram) {
-		return parentMetricGroup.histogram(name, histogram);
-	}
-
-	@Override
-	public final <H extends Histogram> H histogram(int name, H histogram) {
-		return parentMetricGroup.histogram(name, histogram);
-	}
-
-	@Override
-	public final MetricGroup addGroup(int name) {
-		return parentMetricGroup.addGroup(name);
-	}
-
-	@Override
-	public final MetricGroup addGroup(String name) {
-		return parentMetricGroup.addGroup(name);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
deleted file mode 100644
index fdaf1de..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import org.apache.flink.util.AbstractID;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
- * a specific job, running on the TaskManager.
- *
- * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
- */
-@Internal
-public class TaskManagerJobMetricGroup extends JobMetricGroup {
-
-	/** The metrics group that contains this group */
-	private final TaskManagerMetricGroup parent;
-
-	/** Map from execution attempt ID (task identifier) to task metrics */
-	private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
-
-	// ------------------------------------------------------------------------
-
-	public TaskManagerJobMetricGroup(
-		MetricRegistry registry,
-		TaskManagerMetricGroup parent,
-		JobID jobId,
-		@Nullable String jobName) {
-
-		this(registry, checkNotNull(parent), registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName);
-	}
-
-	public TaskManagerJobMetricGroup(
-		MetricRegistry registry,
-		TaskManagerMetricGroup parent,
-		TaskManagerJobScopeFormat scopeFormat,
-		JobID jobId,
-		@Nullable String jobName) {
-
-		super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName));
-
-		this.parent = checkNotNull(parent);
-	}
-
-	public final TaskManagerMetricGroup parent() {
-		return parent;
-	}
-
-	// ------------------------------------------------------------------------
-	//  adding / removing tasks
-	// ------------------------------------------------------------------------
-
-	public TaskMetricGroup addTask(
-		AbstractID vertexId,
-		AbstractID executionId,
-		String taskName,
-		int subtaskIndex,
-		int attemptNumber) {
-
-		checkNotNull(executionId);
-
-		synchronized (this) {
-			if (!isClosed()) {
-				TaskMetricGroup task = new TaskMetricGroup(registry, this,
-					vertexId, executionId, taskName, subtaskIndex, attemptNumber);
-				tasks.put(executionId, task);
-				return task;
-			} else {
-				return null;
-			}
-		}
-	}
-
-	public void removeTaskMetricGroup(AbstractID executionId) {
-		checkNotNull(executionId);
-
-		boolean removeFromParent = false;
-		synchronized (this) {
-			if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) {
-				// this call removed the last task. close this group.
-				removeFromParent = true;
-				close();
-			}
-		}
-
-		// IMPORTANT: removing from the parent must not happen while holding the this group's lock,
-		//      because it would violate the "first parent then subgroup" lock acquisition order
-		if (removeFromParent) {
-			parent.removeJobMetricsGroup(jobId, this);
-		}
-	}
-
-	@Override
-	protected Iterable<? extends ComponentMetricGroup> subComponents() {
-		return tasks.values();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
deleted file mode 100644
index 2b2b201..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
-import org.apache.flink.util.AbstractID;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing a TaskManager.
- *
- * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do
- * not contain tasks any more
- */
-@Internal
-public class TaskManagerMetricGroup extends ComponentMetricGroup {
-
-	private final Map<JobID, TaskManagerJobMetricGroup> jobs = new HashMap<>();
-
-	private final String hostname;
-
-	private final String taskManagerId;
-
-
-	public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) {
-		this(registry, registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId);
-	}
-
-	public TaskManagerMetricGroup(
-			MetricRegistry registry,
-			TaskManagerScopeFormat scopeFormat,
-			String hostname, String taskManagerId) {
-
-		super(registry, scopeFormat.formatScope(hostname, taskManagerId));
-		this.hostname = hostname;
-		this.taskManagerId = taskManagerId;
-	}
-
-	public String hostname() {
-		return hostname;
-	}
-
-	public String taskManagerId() {
-		return taskManagerId;
-	}
-
-	// ------------------------------------------------------------------------
-	//  job groups
-	// ------------------------------------------------------------------------
-
-	public TaskMetricGroup addTaskForJob(
-			JobID jobId,
-			String jobName,
-			AbstractID vertexID,
-			AbstractID executionId,
-			String taskName,
-			int subtaskIndex,
-			int attemptNumber) {
-
-		// we cannot strictly lock both our map modification and the job group modification
-		// because it might lead to a deadlock
-		while (true) {
-			// get or create a jobs metric group
-			TaskManagerJobMetricGroup currentJobGroup;
-			synchronized (this) {
-				currentJobGroup = jobs.get(jobId);
-
-				if (currentJobGroup == null || currentJobGroup.isClosed()) {
-					currentJobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, jobName);
-					jobs.put(jobId, currentJobGroup);
-				}
-			}
-
-			// try to add another task. this may fail if we found a pre-existing job metrics
-			// group and it is closed concurrently
-			TaskMetricGroup taskGroup = currentJobGroup.addTask(
-					vertexID, executionId, taskName, subtaskIndex, attemptNumber);
-
-			if (taskGroup != null) {
-				// successfully added the next task
-				return taskGroup;
-			}
-
-			// else fall through the loop
-		}
-	}
-
-	public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup group) {
-		if (jobId == null || group == null || !group.isClosed()) {
-			return;
-		}
-
-		synchronized (this) {
-			// optimistically remove the currently contained group, and check later if it was correct
-			TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId);
-
-			// check if another group was actually contained, and restore that one
-			if (containedGroup != null && containedGroup != group) {
-				jobs.put(jobId, containedGroup);
-			}
-		}
-	}
-
-	public int numRegisteredJobMetricGroups() {
-		return jobs.size();
-	}
-
-	@Override
-	protected Iterable<? extends ComponentMetricGroup> subComponents() {
-		return jobs.values();
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
deleted file mode 100644
index 5849b4b..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
-import org.apache.flink.util.AbstractID;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink runtime Task.
- * 
- * <p>Contains extra logic for adding operators.
- */
-@Internal
-public class TaskMetricGroup extends ComponentMetricGroup {
-
-	/** The job metrics group containing this task metrics group */
-	private final TaskManagerJobMetricGroup parent;
-
-	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
-
-	private final IOMetricGroup ioMetrics;
-	
-	/** The execution Id uniquely identifying the executed task represented by this metrics group */
-	private final AbstractID executionId;
-
-	@Nullable
-	private final AbstractID vertexId;
-	
-	@Nullable
-	private final String taskName;
-
-	private final int subtaskIndex;
-
-	private final int attemptNumber;
-
-	// ------------------------------------------------------------------------
-
-	public TaskMetricGroup(
-			MetricRegistry registry,
-			TaskManagerJobMetricGroup parent,
-			@Nullable AbstractID vertexId,
-			AbstractID executionId,
-			@Nullable String taskName,
-			int subtaskIndex,
-			int attemptNumber) {
-		
-		this(registry, parent, registry.getScopeFormats().getTaskFormat(),
-				vertexId, executionId, taskName, subtaskIndex, attemptNumber);
-	}
-
-	public TaskMetricGroup(
-			MetricRegistry registry,
-			TaskManagerJobMetricGroup parent,
-			TaskScopeFormat scopeFormat, 
-			@Nullable AbstractID vertexId,
-			AbstractID executionId,
-			@Nullable String taskName,
-			int subtaskIndex,
-			int attemptNumber) {
-
-		super(registry, scopeFormat.formatScope(
-				parent, vertexId, executionId, taskName, subtaskIndex, attemptNumber));
-
-		this.parent = checkNotNull(parent);
-		this.executionId = checkNotNull(executionId);
-		this.vertexId = vertexId;
-		this.taskName = taskName;
-		this.subtaskIndex = subtaskIndex;
-		this.attemptNumber = attemptNumber;
-
-		this.ioMetrics = new IOMetricGroup(this);
-	}
-
-	// ------------------------------------------------------------------------
-	//  properties
-	// ------------------------------------------------------------------------
-
-	public final TaskManagerJobMetricGroup parent() {
-		return parent;
-	}
-
-	public AbstractID executionId() {
-		return executionId;
-	}
-
-	@Nullable
-	public AbstractID vertexId() {
-		return vertexId;
-	}
-
-	@Nullable
-	public String taskName() {
-		return taskName;
-	}
-
-	public int subtaskIndex() {
-		return subtaskIndex;
-	}
-
-	public int attemptNumber() {
-		return attemptNumber;
-	}
-
-	/**
-	 * Returns the IOMetricGroup for this task.
-	 *
-	 * @return IOMetricGroup for this task.
-	 */
-	public IOMetricGroup getIOMetricGroup() {
-		return ioMetrics;
-	}
-
-	// ------------------------------------------------------------------------
-	//  operators and cleanup
-	// ------------------------------------------------------------------------
-	public OperatorMetricGroup addOperator(String name) {
-		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);
-
-		synchronized (this) {
-			OperatorMetricGroup previous = operators.put(name, operator);
-			if (previous == null) {
-				// no operator group so far
-				return operator;
-			} else {
-				// already had an operator group. restore that one.
-				operators.put(name, previous);
-				return previous;
-			}
-		}
-	}
-
-	@Override
-	public void close() {
-		super.close();
-
-		parent.removeTaskMetricGroup(executionId);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	protected Iterable<? extends ComponentMetricGroup> subComponents() {
-		return operators.values();
-	}
-}


Mime
View raw message