flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/2] flink git commit: [FLINK-4773] [metrics] [refactor] Introduce OperatorIOMetricGroup
Date Tue, 25 Oct 2016 10:03:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master c4f9f0d78 -> 022ceb58b


[FLINK-4773] [metrics] [refactor] Introduce OperatorIOMetricGroup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/022ceb58
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/022ceb58
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/022ceb58

Branch: refs/heads/master
Commit: 022ceb58bd6c65b2cf3426b284bc0950b0ce865d
Parents: 77258a0
Author: zentol <chesnay@apache.org>
Authored: Wed Oct 5 15:04:03 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Tue Oct 25 12:03:26 2016 +0200

----------------------------------------------------------------------
 .../metrics/groups/OperatorIOMetricGroup.java   | 44 ++++++++++++++++++++
 .../metrics/groups/OperatorMetricGroup.java     | 13 ++++++
 .../AbstractCachedBuildSideJoinDriver.java      |  4 +-
 .../operators/AbstractOuterJoinDriver.java      |  4 +-
 .../operators/AllGroupCombineDriver.java        |  4 +-
 .../runtime/operators/AllReduceDriver.java      |  4 +-
 .../flink/runtime/operators/BatchTask.java      |  5 ++-
 .../flink/runtime/operators/CoGroupDriver.java  |  4 +-
 .../flink/runtime/operators/CrossDriver.java    | 16 +++----
 .../flink/runtime/operators/DataSinkTask.java   |  3 +-
 .../flink/runtime/operators/DataSourceTask.java |  3 +-
 .../flink/runtime/operators/FlatMapDriver.java  |  4 +-
 .../runtime/operators/GroupReduceDriver.java    |  4 +-
 .../flink/runtime/operators/JoinDriver.java     |  4 +-
 .../flink/runtime/operators/MapDriver.java      |  4 +-
 .../runtime/operators/MapPartitionDriver.java   |  4 +-
 .../flink/runtime/operators/NoOpDriver.java     |  4 +-
 .../runtime/operators/ReduceCombineDriver.java  |  4 +-
 .../flink/runtime/operators/ReduceDriver.java   |  4 +-
 .../flink/runtime/operators/TaskContext.java    |  4 +-
 .../operators/UnionWithTempOperator.java        |  4 +-
 .../operators/chaining/ChainedDriver.java       |  8 ++--
 .../metrics/groups/OperatorGroupTest.java       | 19 ++++++++-
 .../operators/drivers/TestTaskContext.java      |  8 ++--
 .../testutils/BinaryOperatorTestBase.java       |  5 ++-
 .../operators/testutils/DriverTestBase.java     |  7 ++--
 .../testutils/UnaryOperatorTestBase.java        |  6 +--
 .../testutils/UnregisteredTaskMetricsGroup.java |  7 ++++
 .../api/operators/AbstractStreamOperator.java   |  3 +-
 .../runtime/io/StreamInputProcessor.java        |  3 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  3 +-
 31 files changed, 149 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
new file mode 100644
index 0000000..8a69029
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+
+/**
+ * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
+ * forwarded to the parent operator metric group.
+ */
+public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup> {
+
+	private final Counter numRecordsIn;
+	private final Counter numRecordsOut;
+
+	public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) {
+		super(parentMetricGroup);
+		numRecordsIn = parentMetricGroup.counter("numRecordsIn");
+		numRecordsOut = parentMetricGroup.counter("numRecordsOut");
+	}
+
+	public Counter getNumRecordsInCounter() {
+		return numRecordsIn;
+	}
+
+	public Counter getNumRecordsOutCounter() {
+		return numRecordsOut;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index f1354b5..0c823ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -36,9 +36,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
 	private final String operatorName;
 
+	private final OperatorIOMetricGroup ioMetrics;
+
 	public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) {
 		super(registry, registry.getScopeFormats().getOperatorFormat().formatScope(checkNotNull(parent), operatorName), parent);
 		this.operatorName = operatorName;
+
+		ioMetrics = new OperatorIOMetricGroup(this);
 	}
 
 	// ------------------------------------------------------------------------
@@ -55,6 +59,15 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
 			this.parent.subtaskIndex,
 			filter.filterCharacters(this.operatorName));
 	}
+
+	/**
+	 * Returns the OperatorIOMetricGroup for this operator.
+	 *
+	 * @return OperatorIOMetricGroup for this operator.
+	 */
+	public OperatorIOMetricGroup getIOMetricGroup() {
+		return ioMetrics;
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Component Metric Group Specifics

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index 8c66cc7..3a69642 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -67,7 +67,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 	public void initialize() throws Exception {
 		TaskConfig config = this.taskContext.getTaskConfig();
 
-		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
 		
 		TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
 		TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
@@ -169,7 +169,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 
 	@Override
 	public void run() throws Exception {
-		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
 		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
index a28e27e..3987ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
@@ -88,7 +88,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 		
 		final DriverStrategy ls = config.getDriverStrategy();
 
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
 		final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
 		final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
 		
@@ -151,7 +151,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 	
 	@Override
 	public void run() throws Exception {
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 		
 		final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub();
 		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index f0673c6..ceab46d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -94,8 +94,8 @@ public class AllGroupCombineDriver<IN, OUT> implements Driver<GroupCombineFuncti
 			LOG.debug("AllGroupCombine starting.");
 		}
 
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
 		TypeSerializer<IN> serializer = serializerFactory.getSerializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 13d7222..0d79f09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -107,8 +107,8 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 			LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code."));
 		}
 
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		final ReduceFunction<T> stub = this.taskContext.getStub();
 		final MutableObjectIterator<T> input = this.input;

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 29f1c20..354dbac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
@@ -213,7 +214,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 	 * The accumulator map used in the RuntimeContext.
 	 */
 	protected Map<String, Accumulator<?,?>> accumulatorMap;
-	private MetricGroup metrics;
+	private OperatorMetricGroup metrics;
 
 	// --------------------------------------------------------------------------------------------
 	//                                  Task Interface
@@ -1071,7 +1072,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 	}
 
 	@Override
-	public MetricGroup getMetricGroup() {
+	public OperatorMetricGroup getMetricGroup() {
 		return metrics;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 43a913d..c3bd492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -97,7 +97,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1,
 			throw new Exception("Unrecognized driver strategy for CoGoup driver: " + config.getDriverStrategy().name());
 		}
 
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
 		
 		final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
 		final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
@@ -149,7 +149,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1,
 	@Override
 	public void run() throws Exception
 	{
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
 		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index fee0874..c3f3958 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -198,8 +198,8 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 					"First input is outer (blocking) side, second input is inner (spilling) side."));
 		}
 
-		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
 		final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
@@ -266,8 +266,8 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 					"First input is inner (spilling) side, second input is outer (blocking) side."));
 		}
 
-		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
 		final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
@@ -332,8 +332,8 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 					"First input is outer side, second input is inner (spilling) side."));
 		}
 
-		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
 		final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
@@ -385,8 +385,8 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 					"First input is inner (spilling) side, second input is outer side."));
 		}
 
-		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
 		final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index c77a9ae..4626b69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.CloseableInputProvider;
@@ -107,7 +108,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		LOG.debug(getLogString("Starting data sink operator"));
 
 		RuntimeContext ctx = createRuntimeContext();
-		final Counter numRecordsIn = ctx.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
 		
 		if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
 			((RichOutputFormat) this.format).setRuntimeContext(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index c062bf8..4dc3ef5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
@@ -102,7 +103,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 
 		RuntimeContext ctx = createRuntimeContext();
 		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
-		Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut");
+		Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter();
 
 		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
 			((RichInputFormat) this.format).setRuntimeContext(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index 5b4a6ca..1a8f813 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -85,8 +85,8 @@ public class FlatMapDriver<IT, OT> implements Driver<FlatMapFunction<IT, OT>, OT
 
 	@Override
 	public void run() throws Exception {
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 		// cache references on the stack
 		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
 		final FlatMapFunction<IT, OT> function = this.taskContext.getStub();

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index ccd88ec..b6067e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -92,7 +92,7 @@ public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT,
 		if (config.getDriverStrategy() != DriverStrategy.SORTED_GROUP_REDUCE) {
 			throw new Exception("Unrecognized driver strategy for GroupReduce driver: " + config.getDriverStrategy().name());
 		}
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
 		
 		this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
 		this.comparator = this.taskContext.getDriverComparator(0);
@@ -111,7 +111,7 @@ public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT,
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code."));
 		}
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		// cache references on the stack
 		final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index 8543723..551bbff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -88,7 +88,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
 	public void prepare() throws Exception{
 		final TaskConfig config = this.taskContext.getTaskConfig();
 
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
 		
 		// obtain task manager's memory manager and I/O manager
 		final MemoryManager memoryManager = this.taskContext.getMemoryManager();
@@ -214,7 +214,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
 
 	@Override
 	public void run() throws Exception {
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 		final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub();
 		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		final JoinTaskIterator<IT1, IT2, OT> joinIterator = this.joinIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index 65f9061..8661851 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -80,8 +80,8 @@ public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {
 
 	@Override
 	public void run() throws Exception {
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 		// cache references on the stack
 		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
 		final MapFunction<IT, OT> function = this.taskContext.getStub();

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
index 3496e14..0b6461e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -86,8 +86,8 @@ public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<I
 
 	@Override
 	public void run() throws Exception {
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 		// cache references on the stack
 		final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);
 		final MapPartitionFunction<IT, OT> function = this.taskContext.getStub();

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index dd64b76..c483a5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -77,8 +77,8 @@ public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> {
 	@Override
 	public void run() throws Exception {
 		// cache references on the stack
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 		final MutableObjectIterator<T> input = this.taskContext.getInput(0);
 		final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index aea7ae8..e840ecc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -111,7 +111,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
 
 	@Override
 	public void prepare() throws Exception {
-		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		strategy = taskContext.getTaskConfig().getDriverStrategy();
 
@@ -159,7 +159,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
 			LOG.debug("Combiner starting.");
 		}
 
-		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
 
 		final MutableObjectIterator<T> in = taskContext.getInput(0);
 		final TypeSerializer<T> serializer = this.serializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index eb4f2f5..8b09939 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -108,8 +108,8 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 			LOG.debug(this.taskContext.formatLogString("Reducer preprocessing done. Running Reducer code."));
 		}
 
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 
 		// cache references on the stack
 		final MutableObjectIterator<T> input = this.input;

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
index bc3e4c1..73fa3cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
@@ -22,10 +22,10 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
@@ -69,5 +69,5 @@ public interface TaskContext<S, OT> {
 	
 	String formatLogString(String message);
 	
-	MetricGroup getMetricGroup();
+	OperatorMetricGroup getMetricGroup();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
index 3d52925..8402b10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
@@ -60,8 +60,8 @@ public class UnionWithTempOperator<T> implements Driver<Function, T> {
 
 	@Override
 	public void run() throws Exception {
-		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
-		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
 		
 		final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		T reuse = this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 2560135..cf62dfa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -54,7 +54,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 
 	protected boolean objectReuseEnabled = false;
 	
-	protected MetricGroup metrics;
+	protected OperatorMetricGroup metrics;
 	
 	protected Counter numRecordsIn;
 	
@@ -69,8 +69,8 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 		this.taskName = taskName;
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
-		this.numRecordsIn = this.metrics.counter("numRecordsIn");
-		this.numRecordsOut = this.metrics.counter("numRecordsOut");
+		this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
+		this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
 		this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
 
 		Environment env = parent.getEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 7f82d21..af73c27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -19,14 +19,12 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
-
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -60,6 +58,23 @@ public class OperatorGroupTest extends TestLogger {
 	}
 
 	@Test
+	public void testIOMetricGroupInstantiation() {
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+		TaskMetricGroup taskGroup = new TaskMetricGroup(
+			registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0);
+		OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName");
+
+		assertNotNull(opGroup.getIOMetricGroup());
+		assertNotNull(opGroup.getIOMetricGroup().getNumRecordsInCounter());
+		assertNotNull(opGroup.getIOMetricGroup().getNumRecordsOutCounter());
+
+		registry.shutdown();
+	}
+
+	@Test
 	public void testVariables() {
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 62110a7..5acc915 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -25,14 +25,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
@@ -226,7 +226,7 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> {
 	}
 
 	@Override
-	public MetricGroup getMetricGroup() {
-		return new UnregisteredMetricsGroup();
+	public OperatorMetricGroup getMetricGroup() {
+		return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 75f960e..47a7953 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.TaskContext;
@@ -371,8 +372,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 	}
 	
 	@Override
-	public MetricGroup getMetricGroup() {
-		return new UnregisteredMetricsGroup();
+	public OperatorMetricGroup getMetricGroup() {
+		return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 088435a..c9fa664 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.TaskContext;
@@ -368,8 +367,8 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Ta
 	}
 	
 	@Override
-	public MetricGroup getMetricGroup() {
-		return new UnregisteredMetricsGroup();
+	public OperatorMetricGroup getMetricGroup() {
+		return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index a94e694..19744e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -26,11 +26,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.ResettableDriver;
@@ -363,8 +363,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 	}
 	
 	@Override
-	public MetricGroup getMetricGroup() {
-		return null;
+	public OperatorMetricGroup getMetricGroup() {
+		return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index ae87085..31a3336 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -72,4 +73,10 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
 			super(new UnregisteredTaskMetricsGroup());
 		}
 	}
+
+	public static class DummyOperatorMetricGroup extends OperatorMetricGroup {
+		public DummyOperatorMetricGroup() {
+			super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), "testoperator");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f2da9da..5b66466 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DefaultKeyedStateStore;
@@ -159,7 +160,7 @@ public abstract class AbstractStreamOperator<OUT>
 		this.config = config;
 		
 		this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
-		this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
+		this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
 		Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
 		int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
 		if (historySize <= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index aee0c70..5a27564 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -129,7 +130,7 @@ public class StreamInputProcessor<IN> {
 			return false;
 		}
 		if (numRecordsIn == null) {
-			numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn");
+			numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
 		}
 
 		while (true) {

http://git-wip-us.apache.org/repos/asf/flink/blob/022ceb58/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index d02b066..2a4a065 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
@@ -335,7 +336,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 
 		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
 			this.operator = operator;
-			this.numRecordsIn = operator.getMetricGroup().counter("numRecordsIn");
+			this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
 		}
 
 		@Override


Mime
View raw message