flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/2] flink git commit: [FLINK-3949] Collect Metrics in Runtime Operators
Date Thu, 16 Jun 2016 10:34:20 GMT
Repository: flink
Updated Branches:
  refs/heads/master 104958523 -> c78b3c49e


http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
index f51cb68..86be7b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
@@ -76,6 +76,7 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void collect(IT record) {
 		try {
+			this.numRecordsIn.inc();
 			this.mapper.flatMap(record, this.outputCollector);
 		} catch (Exception ex) {
 			throw new ExceptionInChainedStubException(this.taskName, ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
index 9b888f2..cef1b73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@@ -75,6 +75,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void collect(IT record) {
 		try {
+			this.numRecordsIn.inc();
 			this.outputCollector.collect(this.mapper.map(record));
 		} catch (Exception ex) {
 			throw new ExceptionInChainedStubException(this.taskName, ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
index 3912b98..e3de1c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
@@ -59,6 +59,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> extends ChainedDriver<IT,
 
 	@Override
 	public void collect(IT record) {
+		numRecordsIn.inc();
 		agg.aggregate(1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index 63f2b20..e6c8c2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -167,6 +167,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 
 	@Override
 	public void collect(IN record) {
+		numRecordsIn.inc();
 		// try writing to the sorter first
 		try {
 			if (this.sorter.write(record)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 408abc2..a003d9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -163,6 +163,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 
 	@Override
 	public void collect(IN record) {
+		this.numRecordsIn.inc();
 		// try writing to the sorter first
 		try {
 			if (this.sorter.write(record)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
new file mode 100644
index 0000000..f7a1df9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Collector;
+
+public class CountingCollector<OUT> implements Collector<OUT> {
+	private final Collector<OUT> collector;
+	private final Counter numRecordsOut;
+
+	public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
+		this.collector = collector;
+		this.numRecordsOut = numRecordsOut;
+	}
+
+	@Override
+	public void collect(OUT record) {
+		this.numRecordsOut.inc();
+		this.collector.collect(record);
+	}
+
+	@Override
+	public void close() {
+		this.collector.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
new file mode 100644
index 0000000..7494108
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.Iterator;
+
+public class CountingIterable<IN> implements Iterable<IN> {
+
+	private final Iterable<IN> iterable;
+	private final Counter numRecordsIn;
+
+	public CountingIterable(Iterable<IN> iterable, Counter numRecordsIn) {
+		this.iterable = iterable;
+		this.numRecordsIn = numRecordsIn;
+	}
+
+	@Override
+	public Iterator<IN> iterator() {
+		return new CountingIterator<>(this.iterable.iterator(), this.numRecordsIn);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
new file mode 100644
index 0000000..fe89358
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.Iterator;
+
+public class CountingIterator<IN> implements Iterator<IN> {
+	private final Iterator<IN> iterator;
+	private final Counter numRecordsIn;
+
+	public CountingIterator(Iterator<IN> iterator, Counter numRecordsIn) {
+		this.iterator = iterator;
+		this.numRecordsIn = numRecordsIn;
+	}
+
+	@Override
+	public boolean hasNext() {
+		return this.iterator.hasNext();
+	}
+
+	@Override
+	public IN next() {
+		numRecordsIn.inc();
+		return this.iterator.next();
+	}
+
+	@Override
+	public void remove() {
+		this.iterator.remove();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
new file mode 100644
index 0000000..e4b436a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+
+public class CountingMutableObjectIterator<IN> implements MutableObjectIterator<IN> {
+	private final MutableObjectIterator<IN> iterator;
+	private final Counter numRecordsIn;
+
+	public CountingMutableObjectIterator(MutableObjectIterator<IN> iterator, Counter numRecordsIn) {
+		this.iterator = iterator;
+		this.numRecordsIn = numRecordsIn;
+	}
+
+	@Override
+	public IN next(IN reuse) throws IOException {
+		IN next = iterator.next(reuse);
+		if (next != null) {
+			numRecordsIn.inc();
+		}
+		return next;
+	}
+
+	@Override
+	public IN next() throws IOException {
+		IN next = iterator.next();
+		if (next != null) {
+			numRecordsIn.inc();
+		}
+		return next;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 017b16b..58eb90c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -319,7 +319,8 @@ public class Task implements Runnable {
 
 		for (int i = 0; i < this.inputGates.length; i++) {
 			SingleInputGate gate = SingleInputGate.create(
-					taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment);
+					taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment, 
+					metricGroup.getIOMetricGroup());
 
 			this.inputGates[i] = gate;
 			inputGatesById.put(gate.getConsumedResultId(), gate);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 9724a80..6853722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -190,9 +189,5 @@ public class AbstractReaderTest {
 		public void setReporter(AccumulatorRegistry.Reporter reporter) {
 
 		}
-
-		@Override
-		public void setMetricGroup(IOMetricGroup metrics) {
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 9717530..da15f08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -118,7 +119,7 @@ public class InputChannelTest {
 				ResultPartitionID partitionId,
 				Tuple2<Integer, Integer> initialAndMaxBackoff) {
 
-			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
+			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 8852b4f..f91a4ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -271,7 +272,8 @@ public class LocalInputChannelTest {
 				new ResultPartitionID(),
 				partitionManager,
 				mock(TaskEventDispatcher.class),
-				initialAndMaxRequestBackoff);
+				initialAndMaxRequestBackoff,
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 	}
 
 	/**
@@ -346,7 +348,8 @@ public class LocalInputChannelTest {
 					new IntermediateDataSetID(),
 					subpartitionIndex,
 					numberOfInputChannels,
-					mock(PartitionStateChecker.class));
+					mock(PartitionStateChecker.class),
+					new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);
@@ -360,7 +363,8 @@ public class LocalInputChannelTest {
 								i,
 								consumedPartitionIds[i],
 								partitionManager,
-								taskEventDispatcher));
+								taskEventDispatcher,
+								new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()));
 			}
 
 			this.numberOfInputChannels = numberOfInputChannels;

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index c484cc4..9eb49ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import scala.Tuple2;
 
@@ -247,7 +248,8 @@ public class RemoteInputChannelTest {
 				0,
 				partitionId,
 				mock(ConnectionID.class),
-				connectionManager);
+				connectionManager,
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		ch.onFailedPartitionRequest();
 
@@ -266,7 +268,8 @@ public class RemoteInputChannelTest {
 				0,
 				new ResultPartitionID(),
 				mock(ConnectionID.class),
-				connManager);
+				connManager,
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
 
@@ -301,6 +304,7 @@ public class RemoteInputChannelTest {
 				new ResultPartitionID(),
 				mock(ConnectionID.class),
 				connectionManager,
-				initialAndMaxRequestBackoff);
+				initialAndMaxRequestBackoff,
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index c4bb785..05427a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import scala.Tuple2;
 
@@ -66,7 +67,7 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class));
+				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 				new TestInputChannel(inputGate, 0),
@@ -113,7 +114,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class));
+		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -122,12 +123,12 @@ public class SingleInputGateTest {
 		// Local
 		ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher);
+		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		// Unknown
 		ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0));
+		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		// Set channels
 		inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
@@ -168,7 +169,7 @@ public class SingleInputGateTest {
 				new IntermediateDataSetID(),
 				0,
 				1,
-				mock(PartitionStateChecker.class));
+				mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
@@ -179,7 +180,7 @@ public class SingleInputGateTest {
 				partitionManager,
 				new TaskEventDispatcher(),
 				new LocalConnectionManager(),
-				new Tuple2<>(0, 0));
+				new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 
@@ -208,7 +209,8 @@ public class SingleInputGateTest {
 				new IntermediateDataSetID(),
 				0,
 				1,
-				mock(PartitionStateChecker.class));
+				mock(PartitionStateChecker.class),
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		InputChannel unknown = new UnknownInputChannel(
 				inputGate,
@@ -217,7 +219,8 @@ public class SingleInputGateTest {
 				new ResultPartitionManager(),
 				new TaskEventDispatcher(),
 				new LocalConnectionManager(),
-				new Tuple2<>(0, 0));
+				new Tuple2<Integer, Integer>(0, 0),
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 640c11a..607da94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -59,7 +60,7 @@ public class TestSingleInputGate {
 		checkArgument(numberOfInputChannels >= 1);
 
 		SingleInputGate realGate = new SingleInputGate(
-				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class));
+				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		this.inputGate = spy(realGate);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index d8714d1..28f621f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -43,8 +44,8 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class));
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class));
+		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index a2edce2..6d3f768 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.metrics.groups.JobMetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
@@ -65,4 +66,19 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
 			super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
 		}
 	}
+	
+	public static class DummyIOMetricGroup extends IOMetricGroup {
+		public DummyIOMetricGroup() {
+			super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup());
+		}
+
+		@Override
+		protected void addMetric(String name, Metric metric) {
+		}
+
+		@Override
+		public MetricGroup addGroup(String name) {
+			return new UnregisteredMetricsGroup();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 326a42f..dc7bbdb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -25,10 +25,12 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -102,10 +104,10 @@ public abstract class AbstractStreamOperator<OUT>
 	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
 		this.container = containingTask;
 		this.config = config;
-		this.output = output;
 		String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
 		
 		this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName);
+		this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
 		this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
 
 		stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
@@ -334,4 +336,30 @@ public abstract class AbstractStreamOperator<OUT>
 	public void disableInputCopy() {
 		this.inputCopyDisabled = true;
 	}
+
+	public class CountingOutput implements Output<StreamRecord<OUT>> {
+		private final Output<StreamRecord<OUT>> output;
+		private final Counter numRecordsOut;
+
+		public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
+			this.output = output;
+			this.numRecordsOut = counter;
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void collect(StreamRecord<OUT> record) {
+			numRecordsOut.inc();
+			output.collect(record);
+		}
+
+		@Override
+		public void close() {
+			output.close();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 84f59ed..68c623e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -51,6 +51,10 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		this.chainingStrategy = ChainingStrategy.HEAD;
 	}
 
+	public void run(final Object lockingObject) throws Exception {
+		run(lockingObject, output);
+	}
+
 	
 	public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index ab69ab7..33a0407 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -81,6 +83,8 @@ public class StreamInputProcessor<IN> {
 
 	private final DeserializationDelegate<StreamElement> deserializationDelegate;
 
+	private Counter numRecordsIn;
+
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
 								EventListener<CheckpointBarrier> checkpointListener,
@@ -133,6 +137,9 @@ public class StreamInputProcessor<IN> {
 		if (isFinished) {
 			return false;
 		}
+		if (numRecordsIn == null) {
+			numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn");
+		}
 
 		while (true) {
 			if (currentRecordDeserializer != null) {
@@ -166,6 +173,7 @@ public class StreamInputProcessor<IN> {
 						// now we can do the actual processing
 						StreamRecord<IN> record = recordOrWatermark.asRecord();
 						synchronized (lock) {
+							numRecordsIn.inc();
 							streamOperator.setKeyContextElement1(record);
 							streamOperator.processElement(record);
 						}
@@ -211,9 +219,12 @@ public class StreamInputProcessor<IN> {
 	 * @param metrics metric group
      */
 	public void setMetricGroup(IOMetricGroup metrics) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.instantiateMetrics(metrics);
-		}
+		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return lastEmittedWatermark;
+			}
+		});
 	}
 	
 	public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 733e7fb..1a66934 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -287,9 +288,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 	 * @param metrics metric group
 	 */
 	public void setMetricGroup(IOMetricGroup metrics) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.instantiateMetrics(metrics);
-		}
+		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return Math.min(lastEmittedWatermark1, lastEmittedWatermark2);
+			}
+		});
 	}
 	
 	public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 90abea4..761aa37 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -298,14 +299,17 @@ public class OperatorChain<OUT> {
 	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
 		
 		protected final OneInputStreamOperator<T, ?> operator;
+		protected final Counter numRecordsIn;
 
 		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
 			this.operator = operator;
+			this.numRecordsIn = operator.getMetricGroup().counter("numRecordsIn");
 		}
 
 		@Override
 		public void collect(StreamRecord<T> record) {
 			try {
+				numRecordsIn.inc();
 				operator.setKeyContextElement1(record);
 				operator.processElement(record);
 			}
@@ -347,6 +351,7 @@ public class OperatorChain<OUT> {
 		@Override
 		public void collect(StreamRecord<T> record) {
 			try {
+				numRecordsIn.inc();
 				StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
 				operator.setKeyContextElement1(copy);
 				operator.processElement(copy);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index af9278f..7ae99f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	@Override
 	protected void run() throws Exception {
-		headOperator.run(getCheckpointLock(), getHeadOutput());
+		headOperator.run(getCheckpointLock());
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index af7d3f9..58e3cb8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -56,13 +57,27 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		
 		LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
 		
-		this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
+		this.headOperator = new RecordPusher<>();
+		this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
 	}
 
 	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
 		
 		private static final long serialVersionUID = 1L;
 
+		@Override
+		public void processElement(StreamRecord<IN> record) throws Exception {
+			output.collect(record);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) {
+			// ignore
+		}
+	}
+
+	private static class IterationTailOutput<IN> implements Output<StreamRecord<IN>> {
+
 		@SuppressWarnings("NonSerializableFieldInSerializableClass")
 		private final BlockingQueue<StreamRecord<IN>> dataChannel;
 		
@@ -70,25 +85,32 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		
 		private final boolean shouldWait;
 
-		RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
+		IterationTailOutput(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
 			this.dataChannel = dataChannel;
 			this.iterationWaitTime = iterationWaitTime;
 			this.shouldWait =  iterationWaitTime > 0;
 		}
 
 		@Override
-		public void processElement(StreamRecord<IN> record) throws Exception {
-			if (shouldWait) {
-				dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
-			}
-			else {
-				dataChannel.put(record);
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public void collect(StreamRecord<IN> record) {
+			try {
+				if (shouldWait) {
+					dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
+				}
+				else {
+					dataChannel.put(record);
+				}
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e);
 			}
 		}
 
 		@Override
-		public void processWatermark(Watermark mark) {
-			// ignore
+		public void close() {
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a771c85..444245c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -154,6 +155,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	private long recoveryTimestamp;
 
+	private long lastCheckpointSize = 0;
+
 	// ------------------------------------------------------------------------
 	//  Life cycle methods for specific implementations
 	// ------------------------------------------------------------------------
@@ -194,6 +197,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			// allow trigger tasks to be removed if all timers for that timestamp are removed by user
 			timerService.setRemoveOnCancelPolicy(true);
 
+			getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					return StreamTask.this.lastCheckpointSize;
+				}
+			});
+
 			// task specific initialization
 			init();
 			
@@ -538,6 +548,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				if (allStates.isEmpty()) {
 					getEnvironment().acknowledgeCheckpoint(checkpointId);
 				} else if (!hasAsyncStates) {
+					this.lastCheckpointSize = allStates.getStateSize();
 					getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
 				} else {
 					// start a Thread that does the asynchronous materialization and
@@ -572,6 +583,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 									}
 								}
 								StreamTaskStateList allStates = new StreamTaskStateList(states);
+								StreamTask.this.lastCheckpointSize = allStates.getStateSize();
 								getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
 								LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
 							}


Mime
View raw message