flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/6] flink git commit: [FLINK-1502] [core] Add basic metric system
Date Sun, 22 May 2016 18:58:16 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
index 858bc49..83c88cc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.junit.Test;
 
 
@@ -41,7 +42,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableNotFound() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup());
 			
 			try {
 				ctx.getBroadcastVariable("some name");
@@ -71,7 +72,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableSimple() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup());
 			
 			ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
 			ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
@@ -105,7 +106,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableWithInitializer() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			
@@ -130,7 +131,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testResetBroadcastVariableWithInitializer() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			
@@ -153,7 +154,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableWithInitializerAndMismatch() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
index 7ea0071..554820e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
@@ -40,7 +41,7 @@ public class RichInputFormatTest {
 	public void testCheckRuntimeContextAccess() {
 		final SerializedInputFormat<Value> inputFormat = new SerializedInputFormat<Value>();
 		final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);
-		inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()));
+		inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()));
 
 		Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
 		Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
index 273f4f5..09db3a9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
@@ -41,7 +42,7 @@ public class RichOutputFormatTest {
 	public void testCheckRuntimeContextAccess() {
 		final SerializedOutputFormat<Value> inputFormat = new SerializedOutputFormat<Value>();
 		final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);
-		inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()));
+		inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()));
 
 		Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
 		Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
index 5ca4c4c..7c905c1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.common.operators.util.TestRichOutputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.types.Nothing;
 import org.junit.Test;
 
@@ -94,13 +95,13 @@ public class GenericDataSinkBaseTest implements java.io.Serializable {
 			final TaskInfo taskInfo = new TaskInfo("test_sink", 0, 1, 0);
 			executionConfig.disableObjectReuse();
 			in.reset();
-			sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig);
 			assertEquals(out.output, asList(TestIOData.RICH_NAMES));
 
 			executionConfig.enableObjectReuse();
 			out.clear();
 			in.reset();
-			sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig);
 			assertEquals(out.output, asList(TestIOData.RICH_NAMES));
 		} catch(Exception e){
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
index 083039a..c360c62 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.common.operators.util.TestRichInputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -83,7 +84,7 @@ public class GenericDataSourceBaseTest implements java.io.Serializable {
 			executionConfig.disableObjectReuse();
 			assertEquals(false, in.hasBeenClosed());
 			assertEquals(false, in.hasBeenOpened());
-			List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig);
 			assertEquals(true, in.hasBeenClosed());
 			assertEquals(true, in.hasBeenOpened());
 
@@ -91,7 +92,7 @@ public class GenericDataSourceBaseTest implements java.io.Serializable {
 			executionConfig.enableObjectReuse();
 			assertEquals(false, in.hasBeenClosed());
 			assertEquals(false, in.hasBeenOpened());
-			List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig);
 			assertEquals(true, in.hasBeenClosed());
 			assertEquals(true, in.hasBeenOpened());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index cda3245..9447efd 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
@@ -78,7 +79,7 @@ public class FlatMapOperatorCollectionTest implements Serializable {
 		final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0);
 		// run on collections
 		final List<String> result = getTestFlatMapOperator(udf)
-				.executeOnCollections(input, new RuntimeUDFContext(taskInfo,  null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+				.executeOnCollections(input, new RuntimeUDFContext(taskInfo,  null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 
 		Assert.assertEquals(input.size(), result.size());
 		Assert.assertEquals(input, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
index d119fe2..a610a4d 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -125,9 +126,9 @@ public class InnerJoinOperatorBaseTest implements Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig);
 
 			assertEquals(expected, resultSafe);
 			assertEquals(expected, resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index 6059ab1..7ecdefa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -112,9 +113,9 @@ public class MapOperatorTest implements java.io.Serializable {
 			final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0);
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig);
 			
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 71486a5..5012718 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
@@ -85,9 +86,9 @@ public class PartitionMapOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 			
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricGroupTest.java
new file mode 100644
index 0000000..f7502e5
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricGroupTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.util.TestReporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MetricGroupTest {
+	/**
+	 * Verifies that group methods instantiate the correct metric with the given name.
+	 */
+	@Test
+	public void testMetricInstantiation() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter1.class.getName());
+
+		MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+
+		root.counter("counter");
+		Assert.assertTrue(TestReporter1.lastPassedMetric instanceof Counter);
+		Assert.assertEquals("counter", TestReporter1.lastPassedName);
+
+		root.gauge("gauge", new Gauge<Object>() {
+			@Override
+			public Object getValue() {
+				return null;
+			}
+		});
+		Assert.assertTrue(TestReporter1.lastPassedMetric instanceof Gauge);
+		Assert.assertEquals("gauge", TestReporter1.lastPassedName);
+	}
+
+	protected static class TestReporter1 extends TestReporter {
+		public static Metric lastPassedMetric;
+		public static String lastPassedName;
+
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String name) {
+			lastPassedMetric = metric;
+			lastPassedName = name;
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String name) {
+		}
+	}
+
+	/**
+	 * Verifies that metric names containing special characters are rejected.
+	 */
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidMetricName() {
+		Configuration config = new Configuration();
+
+		MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+		root.counter("=)(/!");
+	}
+
+	/**
+	 * Verifies that when attempting to create a group with the name of an existing one the existing one will be returned instead.
+	 */
+	@Test
+	public void testDuplicateGroupName() {
+		Configuration config = new Configuration();
+
+		MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+
+		MetricGroup group1 = root.addGroup("group");
+		MetricGroup group2 = root.addGroup("group");
+		MetricGroup group3 = root.addGroup("group");
+		Assert.assertTrue(group1 == group2 && group2 == group3);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
new file mode 100644
index 0000000..32cc11c
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.Scope;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.metrics.util.TestReporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class MetricRegistryTest {
+	/**
+	 * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
+	 */
+	@Test
+	public void testReporterInstantiation() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter1.class.getName());
+
+		new MetricRegistry(config);
+
+		Assert.assertTrue(TestReporter1.wasOpened);
+	}
+
+	protected static class TestReporter1 extends TestReporter {
+		public static boolean wasOpened = false;
+
+		@Override
+		public void open(Configuration config) {
+			wasOpened = true;
+		}
+	}
+
+	/**
+	 * Verifies that configured arguments are properly forwarded to the reporter.
+	 */
+	@Test
+	public void testReporterArgumentForwarding() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter2.class.getName());
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_ARGUMENTS, "--arg1 hello --arg2 world");
+
+		new MetricRegistry(config);
+	}
+
+	protected static class TestReporter2 extends TestReporter {
+		@Override
+		public void open(Configuration config) {
+			Assert.assertEquals("hello", config.getString("arg1", null));
+			Assert.assertEquals("world", config.getString("arg2", null));
+		}
+	}
+
+	/**
+	 * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics.
+	 *
+	 * @throws InterruptedException
+	 */
+	@Test
+	public void testReporterScheduling() throws InterruptedException {
+		Configuration config = new Configuration();
+
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter3.class.getName());
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_INTERVAL, "50 MILLISECONDS");
+
+		new MetricRegistry(config);
+
+		long start = System.currentTimeMillis();
+		for (int x = 0; x < 10; x++) {
+			Thread.sleep(100);
+			int reportCount = TestReporter3.reportCount;
+			long curT = System.currentTimeMillis();
+			/**
+			 * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. 
+			 * This value however does not not take the first triggered report into account (=> +1). 
+			 * Furthermore we have to account for the mis-alignment between reports being triggered and our time 
+			 * measurement (=> +1); for T=200 a total of 4-6 reports may have been
+			 * triggered depending on whether the end of the interval for the first reports ends before
+			 * or after T=50.
+			 */
+			long maxAllowedReports = (curT - start) / 50 + 2;
+			Assert.assertTrue("Too many report were triggered.", maxAllowedReports >= reportCount);
+		}
+		Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0);
+	}
+
+	protected static class TestReporter3 extends TestReporter implements Scheduled {
+		public static int reportCount = 0;
+
+		@Override
+		public void report() {
+			reportCount++;
+		}
+	}
+
+	/**
+	 * Verifies that groups are correctly created, nesting works, and names are properly forwarded to generate names.
+	 */
+	@Test
+	public void testMetricGroupGeneration() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter4.class.getName());
+
+		MetricRegistry registry = new MetricRegistry(config);
+
+		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
+		root.counter("rootCounter");
+		root.addGroup("top").counter("topCounter");
+	}
+
+	protected static class TestReporter4 extends TestReporter {
+		@Override
+		public String generateName(String name, List<String> scope) {
+			if (name.compareTo("rootCounter") == 0) {
+				Assert.assertEquals("host", scope.get(0));
+				return "success";
+			} else if (name.compareTo("topCounter") == 0) {
+				Assert.assertEquals("host", scope.get(0));
+				Assert.assertEquals("taskmanager", scope.get(1));
+				return "success";
+			} else {
+				Assert.fail();
+				return null;
+			}
+		}
+	}
+
+	/**
+	 * Verifies that reporters implementing the Listener interface are notified when Metrics are added or removed.
+	 */
+	@Test
+	public void testListener() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter6.class.getName());
+
+		MetricRegistry registry = new MetricRegistry(config);
+
+		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
+		root.counter("rootCounter");
+		root.close();
+
+		Assert.assertTrue(TestReporter6.addCalled);
+		Assert.assertTrue(TestReporter6.removeCalled);
+	}
+
+	protected static class TestReporter6 extends TestReporter {
+		public static boolean addCalled = false;
+		public static boolean removeCalled = false;
+
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String name) {
+			addCalled = true;
+			Assert.assertTrue(metric instanceof Counter);
+			Assert.assertEquals("rootCounter", name);
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String name) {
+			removeCalled = true;
+			Assert.assertTrue(metric instanceof Counter);
+			Assert.assertEquals("rootCounter", name);
+		}
+	}
+
+	/**
+	 * Verifies that the scope configuration is properly extracted.
+	 */
+	@Test
+	public void testScopeConfig() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM, "A");
+		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_JOB, "B");
+		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TASK, "C");
+		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_OPERATOR, "D");
+
+		Scope.ScopeFormat scopeConfig = new MetricRegistry(config).getScopeConfig();
+
+		Assert.assertEquals("A", scopeConfig.getTaskManagerFormat());
+		Assert.assertEquals("B", scopeConfig.getJobFormat());
+		Assert.assertEquals("C", scopeConfig.getTaskFormat());
+		Assert.assertEquals("D", scopeConfig.getOperatorFormat());
+
+		Scope.ScopeFormat emptyScopeConfig = new MetricRegistry(new Configuration()).getScopeConfig();
+
+		Assert.assertEquals(TaskManagerMetricGroup.DEFAULT_SCOPE_TM, emptyScopeConfig.getTaskManagerFormat());
+		Assert.assertEquals(JobMetricGroup.DEFAULT_SCOPE_JOB, emptyScopeConfig.getJobFormat());
+		Assert.assertEquals(TaskMetricGroup.DEFAULT_SCOPE_TASK, emptyScopeConfig.getTaskFormat());
+		Assert.assertEquals(OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR, emptyScopeConfig.getOperatorFormat());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
new file mode 100644
index 0000000..89483b3
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class JobGroupTest {
+	@Test
+	public void testGenerateScopeDefault() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		JobMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job");
+
+		List<String> scope = operator.generateScope();
+		assertEquals(4, scope.size());
+		assertEquals("job", scope.get(3));
+	}
+
+	@Test
+	public void testGenerateScopeWildcard() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		JobMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job");
+
+		Scope.ScopeFormat format = new Scope.ScopeFormat();
+		format.setJobFormat(Scope.concat(Scope.SCOPE_WILDCARD, "superjob", JobMetricGroup.SCOPE_JOB_NAME));
+
+		List<String> scope = operator.generateScope(format);
+		assertEquals(5, scope.size());
+		assertEquals("superjob", scope.get(3));
+		assertEquals("job", scope.get(4));
+	}
+
+	@Test
+	public void testGenerateScopeCustom() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		JobMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job");
+
+		Scope.ScopeFormat format = new Scope.ScopeFormat();
+		format.setJobFormat(Scope.concat(TaskManagerMetricGroup.SCOPE_TM_HOST, "superjob", JobMetricGroup.SCOPE_JOB_NAME));
+
+		List<String> scope = operator.generateScope(format);
+		assertEquals(3, scope.size());
+		assertEquals("host", scope.get(0));
+		assertEquals("superjob", scope.get(1));
+		assertEquals("job", scope.get(2));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
new file mode 100644
index 0000000..4f33d2a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.util.AbstractID;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class OperatorGroupTest {
+	@Test
+	public void testGenerateScopeDefault() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job")
+			.addTask(new AbstractID(), new AbstractID(), 0, "task").addOperator("operator");
+
+		List<String> scope = operator.generateScope();
+		assertEquals(6, scope.size());
+		assertEquals("host", scope.get(0));
+		assertEquals("taskmanager", scope.get(1));
+		assertEquals("id", scope.get(2));
+		assertEquals("job", scope.get(3));
+		assertEquals("operator", scope.get(4));
+		assertEquals("0", scope.get(5));
+	}
+
+	@Test
+	public void testGenerateScopeWildcard() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job")
+			.addTask(new AbstractID(), new AbstractID(), 0, "task").addOperator("operator");
+
+		Scope.ScopeFormat format = new Scope.ScopeFormat();
+		format.setOperatorFormat(Scope.concat(Scope.SCOPE_WILDCARD, "op", OperatorMetricGroup.SCOPE_OPERATOR_NAME));
+
+		List<String> scope = operator.generateScope(format);
+		assertEquals(7, scope.size());
+		assertEquals("host", scope.get(0));
+		assertEquals("taskmanager", scope.get(1));
+		assertEquals("id", scope.get(2));
+		assertEquals("job", scope.get(3));
+		assertEquals("task", scope.get(4));
+		assertEquals("op", scope.get(5));
+		assertEquals("operator", scope.get(6));
+	}
+
+	@Test
+	public void testGenerateScopeCustom() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job")
+			.addTask(new AbstractID(), new AbstractID(), 0, "task").addOperator("operator");
+
+		Scope.ScopeFormat format = new Scope.ScopeFormat();
+		format.setOperatorFormat(Scope.concat("jobs", JobMetricGroup.SCOPE_JOB_NAME, "op", OperatorMetricGroup.SCOPE_OPERATOR_NAME));
+
+		List<String> scope = operator.generateScope(format);
+		assertEquals(4, scope.size());
+		assertEquals("jobs", scope.get(0));
+		assertEquals("job", scope.get(1));
+		assertEquals("op", scope.get(2));
+		assertEquals("operator", scope.get(3));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
new file mode 100644
index 0000000..c49fdcd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.util.AbstractID;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TaskGroupTest {
+	@Test
+	public void testGenerateScopeDefault() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job")
+			.addTask(new AbstractID(), new AbstractID(), 0, "task");
+
+		List<String> scope = operator.generateScope();
+		assertEquals(5, scope.size());
+		assertEquals("task", scope.get(4));
+	}
+
+	@Test
+	public void testGenerateScopeWilcard() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job")
+			.addTask(new AbstractID(), new AbstractID(), 0, "task");
+
+		Scope.ScopeFormat format = new Scope.ScopeFormat();
+		format.setTaskFormat(Scope.concat(Scope.SCOPE_WILDCARD, "supertask", TaskMetricGroup.SCOPE_TASK_NAME));
+
+		List<String> scope = operator.generateScope(format);
+		assertEquals(6, scope.size());
+		assertEquals("host", scope.get(0));
+		assertEquals("taskmanager", scope.get(1));
+		assertEquals("id", scope.get(2));
+		assertEquals("job", scope.get(3));
+		assertEquals("supertask", scope.get(4));
+		assertEquals("task", scope.get(5));
+	}
+
+	@Test
+	public void testGenerateScopeCustom() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id")
+			.addJob(new JobID(), "job")
+			.addTask(new AbstractID(), new AbstractID(), 0, "task");
+
+		Scope.ScopeFormat format = new Scope.ScopeFormat();
+		format.setTaskFormat(Scope.concat(TaskManagerMetricGroup.SCOPE_TM_HOST, JobMetricGroup.SCOPE_JOB_NAME, "supertask", TaskMetricGroup.SCOPE_TASK_NAME));
+
+		List<String> scope = operator.generateScope(format);
+		assertEquals(4, scope.size());
+		assertEquals("host", scope.get(0));
+		assertEquals("job", scope.get(1));
+		assertEquals("supertask", scope.get(2));
+		assertEquals("task", scope.get(3));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
new file mode 100644
index 0000000..7b3286d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TaskManagerGroupTest {
+	@Test
+	public void testGenerateScopeDefault() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id");
+
+		List<String> scope = operator.generateScope();
+		assertEquals(3, scope.size());
+		assertEquals("host", scope.get(0));
+		assertEquals("taskmanager", scope.get(1));
+		assertEquals("id", scope.get(2));
+	}
+
+	@Test
+	public void testGenerateScopeWildcard() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id");
+
+		Scope.ScopeFormat format = new Scope.ScopeFormat();
+		format.setTaskManagerFormat(Scope.concat(Scope.SCOPE_WILDCARD, "superhost", TaskManagerMetricGroup.SCOPE_TM_HOST));
+
+		List<String> scope = operator.generateScope(format);
+		assertEquals(2, scope.size());
+		assertEquals("superhost", scope.get(0));
+		assertEquals("host", scope.get(1));
+	}
+
+	@Test
+	public void testGenerateScopeCustom() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id");
+
+		Scope.ScopeFormat format = new Scope.ScopeFormat();
+		format.setTaskManagerFormat(Scope.concat("h", TaskManagerMetricGroup.SCOPE_TM_HOST, "t", TaskManagerMetricGroup.SCOPE_TM_ID));
+
+		List<String> scope = operator.generateScope(format);
+		assertEquals(4, scope.size());
+		assertEquals("h", scope.get(0));
+		assertEquals("host", scope.get(1));
+		assertEquals("t", scope.get(2));
+		assertEquals("id", scope.get(3));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
new file mode 100644
index 0000000..0d683c2
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.reporter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class JMXReporterTest {
+	/**
+	 * Verifies that the JMXReporter properly generates the JMX name.
+	 */
+	@Test
+	public void testGenerateName() {
+		String name = "metric";
+
+		List<String> scope = new ArrayList<>();
+		scope.add("value0");
+		scope.add("value1");
+		scope.add("\"value2 (test),=;:?'");
+
+		String jmxName = new JMXReporter().generateName(name, scope);
+
+		Assert.assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=metric", jmxName);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
new file mode 100644
index 0000000..d607072
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.util.AbstractID;
+
+public class DummyJobMetricGroup extends JobMetricGroup {
+	public DummyJobMetricGroup() {
+		super(new DummyMetricRegistry(), new DummyTaskManagerMetricGroup(), new JobID(), "job");
+	}
+
+	@Override
+	public DummyTaskMetricGroup addTask(AbstractID id, AbstractID attemptID, int subtaskIndex, String name) {
+		return new DummyTaskMetricGroup();
+	}
+
+	@Override
+	protected MetricGroup addMetric(String name, Metric metric) {
+		return this;
+	}
+
+	@Override
+	public MetricGroup addGroup(int name) {
+		return addGroup("" + name);
+	}
+
+	@Override
+	public MetricGroup addGroup(String name) {
+		return new DummyMetricGroup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
new file mode 100644
index 0000000..26df874
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.metrics.groups.Scope;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DummyMetricGroup extends AbstractMetricGroup {
+	public DummyMetricGroup() {
+		super(new DummyMetricRegistry());
+	}
+
+	@Override
+	public List<String> generateScope() {
+		return new ArrayList<>();
+	}
+
+	@Override
+	public List<String> generateScope(Scope.ScopeFormat format) {
+		return new ArrayList<>();
+	}
+
+	@Override
+	protected MetricGroup addMetric(String name, Metric metric) {
+		return this;
+	}
+
+	@Override
+	public MetricGroup addGroup(int name) {
+		return addGroup("" + name);
+	}
+
+	@Override
+	public MetricGroup addGroup(String name) {
+		return new DummyMetricGroup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
new file mode 100644
index 0000000..f0d6d3f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+
+public class DummyMetricRegistry extends MetricRegistry {
+	private static final Configuration config;
+
+	static {
+		config = new Configuration();
+		config.setString(KEY_METRICS_REPORTER_CLASS, DummyReporter.class.getCanonicalName());
+	}
+
+	public DummyMetricRegistry() {
+		super(new Configuration());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
new file mode 100644
index 0000000..eb45f6a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+
+public class DummyOperatorMetricGroup extends OperatorMetricGroup {
+	public DummyOperatorMetricGroup() {
+		super(new DummyMetricRegistry(), new DummyTaskMetricGroup(), "operator", 0);
+	}
+
+	@Override
+	protected MetricGroup addMetric(String name, Metric metric) {
+		return this;
+	}
+
+	@Override
+	public MetricGroup addGroup(int name) {
+		return addGroup("" + name);
+	}
+
+	@Override
+	public MetricGroup addGroup(String name) {
+		return new DummyMetricGroup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java
new file mode 100644
index 0000000..23a7768
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.reporter.MetricReporter;
+
+import java.util.List;
+
+public class DummyReporter implements MetricReporter {
+	@Override
+	public void open(Configuration config) {
+	}
+
+	@Override
+	public void close() {
+	}
+
+	@Override
+	public void notifyOfAddedMetric(Metric metric, String name) {
+	}
+
+	@Override
+	public void notifyOfRemovedMetric(Metric metric, String name) {
+	}
+
+	@Override
+	public String generateName(String name, List<String> scope) {
+		return "";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
new file mode 100644
index 0000000..1c7d33b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+
+public class DummyTaskManagerMetricGroup extends TaskManagerMetricGroup {
+	public DummyTaskManagerMetricGroup() {
+		super(new DummyMetricRegistry(), "host", "id");
+	}
+
+	public DummyJobMetricGroup addJob(JobID id, String name) {
+		return new DummyJobMetricGroup();
+	}
+
+	@Override
+	protected MetricGroup addMetric(String name, Metric metric) {
+		return this;
+	}
+
+	@Override
+	public MetricGroup addGroup(int name) {
+		return addGroup("" + name);
+	}
+
+	@Override
+	public MetricGroup addGroup(String name) {
+		return new DummyMetricGroup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
new file mode 100644
index 0000000..53683f4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.util.AbstractID;
+
+public class DummyTaskMetricGroup extends TaskMetricGroup {
+	public DummyTaskMetricGroup() {
+		super(new DummyMetricRegistry(), new DummyJobMetricGroup(), new AbstractID(), new AbstractID(), 0, "task");
+	}
+
+	public DummyOperatorMetricGroup addOperator(String name) {
+		return new DummyOperatorMetricGroup();
+	}
+
+	@Override
+	protected MetricGroup addMetric(String name, Metric metric) {
+		return this;
+	}
+
+	@Override
+	public MetricGroup addGroup(int name) {
+		return addGroup("" + name);
+	}
+
+	@Override
+	public MetricGroup addGroup(String name) {
+		return new DummyMetricGroup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
new file mode 100644
index 0000000..482d1e8
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+
+import java.util.List;
+
+public class TestReporter extends AbstractReporter {
+	@Override
+	public void open(Configuration config) {
+	}
+
+	@Override
+	public void close() {
+	}
+
+	@Override
+	public String generateName(String name, List<String> scope) {
+		return name;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index ffbec07..b6bdbed 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -103,6 +103,8 @@ KEY_ENV_SSH_OPTS="env.ssh.opts"
 KEY_RECOVERY_MODE="recovery.mode"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
+KEY_METRICS_JMX_PORT="metrics.jmx.port"
+
 ########################################################################################################################
 # PATHS AND CONFIG
 ########################################################################################################################
@@ -240,6 +242,10 @@ if [ -z "${RECOVERY_MODE}" ]; then
     RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
 fi
 
+if [ -z "${JMX_PORT}" ]; then
+    JMX_PORT=$(readFromConfig ${KEY_METRICS_JMX_PORT} 9010 "${YAML_CONF}")
+fi
+
 # Arguments for the JVM. Used for job and task manager JVMs.
 # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
 # KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index 1ef7439..cc7163f 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -23,14 +23,24 @@ USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zook
 STARTSTOP=$1
 DAEMON=$2
 ARGS=("${@:3}") # get remaining arguments as array
+JMX_ARGS=""
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
 
 case $DAEMON in
     (jobmanager)
         CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
+        if [ "${ARGS[3]}" == "local" ]; then
+            JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
+        fi
     ;;
 
     (taskmanager)
         CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
+        JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
     ;;
 
     (zookeeper)
@@ -43,11 +53,6 @@ case $DAEMON in
     ;;
 esac
 
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/config.sh
-
 if [ "$FLINK_IDENT_STRING" = "" ]; then
     FLINK_IDENT_STRING="$USER"
 fi
@@ -96,12 +101,13 @@ case $STARTSTOP in
           count="${#active[@]}"
 
           if [ ${count} -gt 0 ]; then
+            JMX_ARGS=""
             echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
           fi
         fi
 
         echo "Starting $DAEMON daemon on host $HOSTNAME."
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} ${JMX_ARGS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
 
         mypid=$!
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index 8ce83f3..31b3ba2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
@@ -76,7 +77,7 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 			final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
 			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
 			final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0);
-			final RuntimeContext ctx = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulators);
+			final RuntimeContext ctx = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulators, new DummyMetricGroup());
 
 			{
 				SumCoGroup udf1 = new SumCoGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index a563281..1b627c4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -168,9 +169,9 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 			
 			
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
index a6b3deb..1d5668b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -107,9 +108,9 @@ public class InnerJoinOperatorBaseTest implements Serializable {
 			final TaskInfo taskInfo = new TaskInfo("op", 0, 1, 0);
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 
 			assertEquals(expected, new HashSet<>(resultSafe));
 			assertEquals(expected, new HashSet<>(resultRegular));

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index c04916d..4317c03 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -145,9 +146,9 @@ public class ReduceOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
 
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
 			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-metrics/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml
new file mode 100644
index 0000000..84d9722
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-dropwizard</artifactId>
+	<name>flink-metrics-dropwizard</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
new file mode 100644
index 0000000..a7309be
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.metrics.CounterWrapper;
+import org.apache.flink.dropwizard.metrics.GaugeWrapper;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import java.util.List;
+
+/**
+ * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a
+ * Dropwizard {@link com.codahale.metrics.Reporter}.
+ */
+@PublicEvolving
+public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled {
+	protected MetricRegistry registry;
+	protected ScheduledReporter reporter;
+
+	public static final String ARG_HOST = "host";
+	public static final String ARG_PORT = "port";
+	public static final String ARG_PREFIX = "prefix";
+	public static final String ARG_CONVERSION_RATE = "rateConversion";
+	public static final String ARG_CONVERSION_DURATION = "durationConversion";
+
+	protected ScheduledDropwizardReporter() {
+		this.registry = new MetricRegistry();
+	}
+
+	@Override
+	public synchronized void notifyOfAddedMetric(Metric metric, String name) {
+		if (metric instanceof Counter) {
+			registry.register(name, new CounterWrapper((Counter) metric));
+		} else if (metric instanceof Gauge) {
+			registry.register(name, new GaugeWrapper((Gauge) metric));
+		}
+	}
+
+	@Override
+	public synchronized void notifyOfRemovedMetric(Metric metric, String name) {
+		registry.remove(name);
+	}
+
+	public abstract ScheduledReporter getReporter(Configuration config);
+
+	@Override
+	public void open(Configuration config) {
+		this.reporter = getReporter(config);
+	}
+
+	@Override
+	public void close() {
+		this.reporter.stop();
+	}
+
+	@Override
+	public String generateName(String name, List<String> scope) {
+		StringBuilder sb = new StringBuilder();
+		for (String s : scope) {
+			sb.append(s);
+			sb.append('.');
+		}
+		sb.append(name);
+		return sb.toString();
+	}
+
+	@Override
+	public synchronized void report() {
+		this.reporter.report(
+			this.registry.getGauges(),
+			this.registry.getCounters(),
+			this.registry.getHistograms(),
+			this.registry.getMeters(),
+			this.registry.getTimers());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
new file mode 100644
index 0000000..f6630b9
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+public class CounterWrapper extends com.codahale.metrics.Counter {
+	private final Counter counter;
+
+	public CounterWrapper(Counter counter) {
+		this.counter = counter;
+	}
+
+	@Override
+	public long getCount() {
+		return this.counter.getCount();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
new file mode 100644
index 0000000..d47090d
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+public class GaugeWrapper implements com.codahale.metrics.Gauge {
+	private final Gauge gauge;
+
+	public GaugeWrapper(Gauge gauge) {
+		this.gauge = gauge;
+	}
+
+	@Override
+	public Object getValue() {
+		return this.gauge.getValue();
+	}
+}


Mime
View raw message