flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [hotfix] [network] Add DEBUG log messages to intermediate results
Date Wed, 13 Jul 2016 15:27:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master d17fe4f63 -> 565f94128


[hotfix] [network] Add DEBUG log messages to intermediate results

This adds log messages about created result partition consumers and
spilling.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/565f9412
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/565f9412
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/565f9412

Branch: refs/heads/master
Commit: 565f94128fe7938afeb69bcc75963a340fe9a82b
Parents: d17fe4f
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Jul 13 17:15:11 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Jul 13 17:16:29 2016 +0200

----------------------------------------------------------------------
 .../network/partition/PipelinedSubpartitionView.java   |  7 +++++++
 .../runtime/io/network/partition/ResultPartition.java  |  6 +++++-
 .../io/network/partition/SpillableSubpartition.java    |  8 ++++++--
 .../network/partition/SpillableSubpartitionView.java   |  7 +++++++
 .../partition/SpilledSubpartitionViewAsyncIO.java      | 13 +++++++++++++
 .../partition/SpilledSubpartitionViewSyncIO.java       | 13 +++++++++++++
 6 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/565f9412/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 86c241c..f8d81a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -75,4 +75,11 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
 	public Throwable getFailureCause() {
 		return parent.getFailureCause();
 	}
+
+	@Override
+	public String toString() {
+		return String.format("PipelinedSubpartitionView(index: %d) of ResultPartition %s",
+				parent.index,
+				parent.parent.getPartitionId());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/565f9412/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 0068fe0..7c109f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -345,7 +345,11 @@ public class ResultPartition implements BufferPoolOwner {
 
 		checkElementIndex(index, subpartitions.length, "Subpartition not found.");
 
-		return subpartitions[index].createReadView(bufferProvider);
+		ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider);
+
+		LOG.debug("Created {}", readView);
+
+		return readView;
 	}
 
 	public Throwable getFailureCause() {

http://git-wip-us.apache.org/repos/asf/flink/blob/565f9412/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 556f1d2..3e4692a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -152,12 +152,16 @@ class SpillableSubpartition extends ResultSubpartition {
 
 				final int numberOfBuffers = buffers.size();
 
+				long spilledBytes = 0;
+
 				// Spill all buffers
 				for (int i = 0; i < numberOfBuffers; i++) {
-					spillWriter.writeBlock(buffers.remove(0));
+					Buffer buffer = buffers.remove(0);
+					spilledBytes += buffer.getSize();
+					spillWriter.writeBlock(buffer);
 				}
 
-				LOG.debug("Spilling {} buffers of {}.", numberOfBuffers, this);
+				LOG.debug("Spilled {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId());
 
 				return numberOfBuffers;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/565f9412/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 39427cf..29c2002 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -169,4 +169,11 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 	public Throwable getFailureCause() {
 		return parent.getFailureCause();
 	}
+
+	@Override
+	public String toString() {
+		return String.format("SpillableSubpartitionView(index: %d) of ResultPartition %s",
+				parent.index,
+				parent.parent.getPartitionId());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/565f9412/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
index d63f7d5..daccd28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
@@ -81,6 +81,9 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
 	/** Flag indicating whether we reached EOF at the file reader. */
 	private volatile boolean hasReachedEndOfFile;
 
+	/** Spilled file size */
+	private final long fileSize;
+
 	SpilledSubpartitionViewAsyncIO(
 			ResultSubpartition parent,
 			BufferProvider bufferProvider,
@@ -114,6 +117,8 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView
{
 
 		this.readBatchSize = readBatchSize;
 
+		this.fileSize = asyncFileReader.getSize();
+
 		// Trigger the initial read requests
 		readNextBatchAsync();
 	}
@@ -347,6 +352,14 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView
{
 		}
 	}
 
+	@Override
+	public String toString() {
+		return String.format("SpilledSubpartitionView[async](index: %d, file size: %d bytes) of
ResultPartition %s",
+				parent.index,
+				fileSize,
+				parent.parent.getPartitionId());
+	}
+
 	/**
 	 * Callback from the buffer provider.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/565f9412/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
index 927d1e9..c86697f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
@@ -54,6 +54,9 @@ class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
 	/** Flag indicating whether all resources have been released. */
 	private AtomicBoolean isReleased = new AtomicBoolean();
 
+	/** Spilled file size */
+	private final long fileSize;
+
 	SpilledSubpartitionViewSyncIO(
 			ResultSubpartition parent,
 			int memorySegmentSize,
@@ -71,6 +74,8 @@ class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
 		if (initialSeekPosition > 0) {
 			fileReader.seekToPosition(initialSeekPosition);
 		}
+
+		this.fileSize = fileReader.getSize();
 	}
 
 	@Override
@@ -117,6 +122,14 @@ class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView
{
 		return parent.getFailureCause();
 	}
 
+	@Override
+	public String toString() {
+		return String.format("SpilledSubpartitionView[sync](index: %d, file size: %d bytes) of
ResultPartition %s",
+				parent.index,
+				fileSize,
+				parent.parent.getPartitionId());
+	}
+
 	/**
 	 * A buffer pool to provide buffer to read the file into.
 	 *


Mime
View raw message