flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/6] flink git commit: [FLINK-8734][network] fix partition bytes counting and re-enable in tests
Date Tue, 27 Feb 2018 08:22:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master 915213c7a -> 9fb1c23aa


[FLINK-8734][network] fix partition bytes counting and re-enable in tests

This closes #5550.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9daf9cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9daf9cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9daf9cc

Branch: refs/heads/master
Commit: f9daf9cc4243a80b38a1f81bf2b9b37565fe2d61
Parents: 4bf76ae
Author: Nico Kruber <nico@data-artisans.com>
Authored: Tue Feb 20 18:05:54 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Feb 27 09:07:13 2018 +0100

----------------------------------------------------------------------
 .../partition/SpillableSubpartitionView.java    |  7 +++-
 .../partition/PipelinedSubpartitionTest.java    | 16 ++++----
 .../partition/SpillableSubpartitionTest.java    | 41 ++++++++++++--------
 .../network/partition/SubpartitionTestBase.java |  3 ++
 4 files changed, 42 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 789b3d0..b821dcf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -18,15 +18,17 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -115,6 +117,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 						checkState(bufferConsumer.isFinished(), "BufferConsumer must be finished before " +
 							"spilling. Otherwise we would not be able to simply remove it from the queue. This
should " +
 							"be guaranteed by creating ResultSubpartitionView only once Subpartition isFinished.");
+						parent.updateStatistics(buffer);
 						spilledBytes += buffer.getSize();
 						spillWriter.writeBlock(buffer);
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 2ca01c8..528f0e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -207,8 +207,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		assertEquals(1, subpartition.getTotalNumberOfBuffers());
 		assertEquals(1, subpartition.getBuffersInBacklog());
-		// TODO: re-enable?
-//		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+		assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the
buffer
 
 		// ...should have resulted in a notification
 		verify(listener, times(1)).notifyDataAvailable();
@@ -218,6 +217,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		BufferAndBacklog read = view.getNextBuffer();
 		assertNotNull(read);
 		assertTrue(read.buffer().isBuffer());
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting
the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
 		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertFalse(read.nextBufferIsEvent());
@@ -231,14 +231,14 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase
{
 
 		assertEquals(2, subpartition.getTotalNumberOfBuffers());
 		assertEquals(1, subpartition.getBuffersInBacklog());
-		// TODO: re-enable?
-//		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting
the buffer
 		verify(listener, times(2)).notifyDataAvailable();
 
 		assertFalse(view.nextBufferIsEvent());
 		read = view.getNextBuffer();
 		assertNotNull(read);
 		assertTrue(read.buffer().isBuffer());
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when
getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
 		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertFalse(read.nextBufferIsEvent());
@@ -258,14 +258,14 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase
{
 
 		assertEquals(5, subpartition.getTotalNumberOfBuffers());
 		assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
-		// TODO: re-enable?
-//		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when
getting the buffer
 		verify(listener, times(4)).notifyDataAvailable();
 
 		assertFalse(view.nextBufferIsEvent()); // the first buffer
 		read = view.getNextBuffer();
 		assertNotNull(read);
 		assertTrue(read.buffer().isBuffer());
+		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when
getting the buffer
 		assertEquals(1, subpartition.getBuffersInBacklog());
 		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertTrue(read.nextBufferIsEvent());
@@ -274,6 +274,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		read = view.getNextBuffer();
 		assertNotNull(read);
 		assertFalse(read.buffer().isBuffer());
+		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when
getting the buffer
 		assertEquals(1, subpartition.getBuffersInBacklog());
 		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertFalse(read.nextBufferIsEvent());
@@ -282,6 +283,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		read = view.getNextBuffer();
 		assertNotNull(read);
 		assertTrue(read.buffer().isBuffer());
+		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when
getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
 		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertFalse(read.nextBufferIsEvent());
@@ -473,6 +475,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 			Assert.fail("buffer 2 not recycled");
 		}
 		assertEquals(2, partition.getTotalNumberOfBuffers());
-		//assertEquals(2 * 4096, partition.getTotalNumberOfBytes());
+		assertEquals(0, partition.getTotalNumberOfBytes()); // buffer data is never consumed
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 65d98e6..43bcd31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -199,8 +199,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 		assertEquals(4, partition.getTotalNumberOfBuffers());
 		assertEquals(3, partition.getBuffersInBacklog());
-		//TODO: re-enable this?
-//		assertEquals(BUFFER_DATA_SIZE * 4, partition.getTotalNumberOfBytes());
+		assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when getting/releasing
the buffers
 
 		assertFalse(bufferConsumer.isRecycled());
 		assertEquals(4, partition.releaseMemory());
@@ -305,8 +304,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 		assertEquals(5, partition.getTotalNumberOfBuffers());
 		assertEquals(3, partition.getBuffersInBacklog());
-		//TODO: re-enable this?
-//		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes());
+		assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when getting/spilling
the buffers
 
 		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
 		SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener);
@@ -319,6 +317,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		BufferAndBacklog read = reader.getNextBuffer(); // first buffer (non-spilled)
 		assertNotNull(read);
 		assertTrue(read.buffer().isBuffer());
+		assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when
getting/spilling the buffers
 		assertEquals(2, partition.getBuffersInBacklog());
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		read.buffer().recycleBuffer();
@@ -332,8 +331,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		// still same statistics:
 		assertEquals(5, partition.getTotalNumberOfBuffers());
 		assertEquals(2, partition.getBuffersInBacklog());
-		//TODO: re-enable this?
-//		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes());
+		// only updated when getting/spilling the buffers but without the nextBuffer (kept in memory)
+		assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes());
 
 		listener.awaitNotifications(3, 30_000);
 		assertEquals(3, listener.getNumNotifications());
@@ -342,6 +341,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		read = reader.getNextBuffer();
 		assertNotNull(read);
 		assertTrue(read.buffer().isBuffer());
+		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // finally integrates
the nextBuffer statistics
 		assertEquals(1, partition.getBuffersInBacklog());
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		read.buffer().recycleBuffer();
@@ -353,6 +353,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		read = reader.getNextBuffer();
 		assertNotNull(read);
 		assertFalse(read.buffer().isBuffer());
+		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated
during spilling
 		assertEquals(1, partition.getBuffersInBacklog());
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		read.buffer().recycleBuffer();
@@ -362,6 +363,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		read = reader.getNextBuffer();
 		assertNotNull(read);
 		assertTrue(read.buffer().isBuffer());
+		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated
during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertFalse(read.buffer().isRecycled());
@@ -373,6 +375,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertTrue(reader.nextBufferIsEvent());
 		read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated
during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertEquals(EndOfPartitionEvent.class,
@@ -421,8 +424,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		partition.finish();
 		// finish adds an EndOfPartitionEvent
 		assertEquals(1, partition.getTotalNumberOfBuffers());
-		//TODO: re-enable this?
-//		assertEquals(4, partition.getTotalNumberOfBytes());
+		// if not spilled, statistics are only updated when consuming the buffers
+		assertEquals(spilled ? 4 : 0, partition.getTotalNumberOfBytes());
 
 		BufferConsumer buffer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
 		try {
@@ -435,8 +438,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		}
 		// still same statistics
 		assertEquals(1, partition.getTotalNumberOfBuffers());
-		//TODO: re-enable this?
-//		assertEquals(4, partition.getTotalNumberOfBytes());
+		// if not spilled, statistics are only updated when consuming the buffers
+		assertEquals(spilled ? 4 : 0, partition.getTotalNumberOfBytes());
 	}
 
 	@Test
@@ -546,13 +549,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase
{
 			assertFalse("buffer1 should not be recycled (still in the queue)", buffer1.isRecycled());
 			assertFalse("buffer2 should not be recycled (still in the queue)", buffer2.isRecycled());
 			assertEquals(2, partition.getTotalNumberOfBuffers());
-			//TODO: re-enable this?
-//			assertEquals(BUFFER_DATA_SIZE * 2, partition.getTotalNumberOfBytes());
+			assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when buffers are consumed
or spilled
 
 			if (createView) {
 				// Create a read view
 				partition.finish();
 				partition.createReadView(new NoOpBufferAvailablityListener());
+				assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when buffers are
consumed or spilled
 			}
 
 			// one instance of the buffers is placed in the view's nextBuffer and not released
@@ -571,8 +574,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		}
 		// note: a view requires a finished partition which has an additional EndOfPartitionEvent
 		assertEquals(2 + (createView ? 1 : 0), partition.getTotalNumberOfBuffers());
-		//TODO: re-enable this?
-//		assertEquals(BUFFER_DATA_SIZE * 2 + (createView ? 4 : 0), partition.getTotalNumberOfBytes());
+		// with a view, one buffer remains in nextBuffer and is not counted yet
+		assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : BUFFER_DATA_SIZE), partition.getTotalNumberOfBytes());
 	}
 
 	/**
@@ -699,8 +702,14 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		}
 		// note: in case we create a view, there will be an additional EndOfPartitionEvent
 		assertEquals(createView ? 3 : 2, partition.getTotalNumberOfBuffers());
-		//TODO: re-enable this?
-//		assertEquals((createView ? 4 : 0) + 2 * BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes());
+		if (spilled) {
+			// with a view, one buffer remains in nextBuffer and is not counted yet
+			assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : BUFFER_DATA_SIZE),
+				partition.getTotalNumberOfBytes());
+		} else {
+			// non-spilled byte statistics are only updated when buffers are consumed
+			assertEquals(0, partition.getTotalNumberOfBytes());
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 48846b6..1b861df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -52,15 +52,18 @@ public abstract class SubpartitionTestBase extends TestLogger {
 		try {
 			subpartition.finish();
 			assertEquals(1, subpartition.getTotalNumberOfBuffers());
+			assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated after consuming
the buffers
 
 			assertEquals(1, subpartition.getTotalNumberOfBuffers());
 			assertEquals(0, subpartition.getBuffersInBacklog());
+			assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated after consuming
the buffers
 
 			BufferConsumer bufferConsumer = createFilledBufferConsumer(4096, 4096);
 
 			assertFalse(subpartition.add(bufferConsumer));
 			assertEquals(1, subpartition.getTotalNumberOfBuffers());
 			assertEquals(0, subpartition.getBuffersInBacklog());
+			assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated after consuming
the buffers
 		} finally {
 			if (subpartition != null) {
 				subpartition.release();


Mime
View raw message