flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/6] flink git commit: [FLINK-5118] [metrics] Fix inconsistent numBytesIn/Out metrics for network channels
Date Mon, 23 Jan 2017 13:12:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master 33780ef4f -> 3a258f7ea


[FLINK-5118] [metrics] Fix inconsistent numBytesIn/Out metrics for network channels

This closes #3106


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a258f7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a258f7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a258f7e

Branch: refs/heads/master
Commit: 3a258f7eabe0940abaca63f900a080204b32d2fc
Parents: a246ec2
Author: zentol <chesnay@apache.org>
Authored: Thu Jan 12 14:59:22 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Jan 23 14:03:04 2017 +0100

----------------------------------------------------------------------
 .../api/serialization/SpanningRecordSerializer.java      |  8 --------
 .../runtime/io/network/api/writer/RecordWriter.java      | 11 ++++++++---
 2 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a258f7e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 335d12e..cb5665b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -24,7 +24,6 @@ import java.nio.ByteOrder;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -59,8 +58,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable>
implements R
 	/** Limit of current {@link MemorySegment} of target buffer */
 	private int limit;
 
-	private transient Counter numBytesOut;
-
 	public SpanningRecordSerializer() {
 		this.serializationBuffer = new DataOutputSerializer(128);
 
@@ -97,10 +94,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable>
implements R
 
 		int len = this.serializationBuffer.length();
 		this.lengthBuffer.putInt(0, len);
-		
-		if (numBytesOut != null) {
-			numBytesOut.inc(len);
-		}
 
 		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
 
@@ -203,6 +196,5 @@ public class SpanningRecordSerializer<T extends IOReadableWritable>
implements R
 
 	@Override
 	public void instantiateMetrics(TaskIOMetricGroup metrics) {
-		numBytesOut = metrics.getNumBytesOutCounter();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a258f7e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 9283b70..c698ff5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -58,6 +60,8 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	private final Random RNG = new XORShiftRandom();
 
+	private Counter numBytesOut = new SimpleCounter();
+
 	public RecordWriter(ResultPartitionWriter writer) {
 		this(writer, new RoundRobinChannelSelector<T>());
 	}
@@ -113,6 +117,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 				Buffer buffer = serializer.getCurrentBuffer();
 
 				if (buffer != null) {
+					numBytesOut.inc(buffer.getSize());
 					writeAndClearBuffer(buffer, targetChannel, serializer);
 
 					// If this was a full record, we are done. Not breaking
@@ -140,6 +145,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 				synchronized (serializer) {
 					Buffer buffer = serializer.getCurrentBuffer();
 					if (buffer != null) {
+						numBytesOut.inc(buffer.getSize());
 						writeAndClearBuffer(buffer, targetChannel, serializer);
 					} else if (serializer.hasData()) {
 						// sanity check
@@ -167,6 +173,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 					Buffer buffer = serializer.getCurrentBuffer();
 
 					if (buffer != null) {
+						numBytesOut.inc(buffer.getSize());
 						targetPartition.writeBuffer(buffer, targetChannel);
 					}
 				} finally {
@@ -198,9 +205,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 * @param metrics
      */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
-		for(RecordSerializer<?> serializer : serializers) {
-			serializer.instantiateMetrics(metrics);
-		}
+		numBytesOut = metrics.getNumBytesOutCounter();
 	}
 
 	/**


Mime
View raw message