flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-5556] [checkpointing] Report correct buffered bytes during alignment
Date Thu, 19 Jan 2017 16:16:51 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6fb9ebc4f -> e1b2cd01c


[FLINK-5556] [checkpointing] Report correct buffered bytes during alignment

This closes #3164.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1b2cd01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1b2cd01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1b2cd01

Branch: refs/heads/master
Commit: e1b2cd01cb85c59dc00b19c668b2aed649fcbb36
Parents: 6fb9ebc
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Jan 18 18:14:16 2017 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Jan 19 17:16:31 2017 +0100

----------------------------------------------------------------------
 .../streaming/runtime/io/BarrierBuffer.java     |  4 ++-
 .../streaming/runtime/io/BufferSpiller.java     |  3 ++
 .../streaming/runtime/io/BarrierBufferTest.java | 35 ++++++++++++++++----
 .../streaming/runtime/io/BufferSpillerTest.java | 17 +++++++++-
 4 files changed, 51 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1b2cd01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0baf126..e91c26a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -361,8 +361,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			CheckpointMetaData checkpointMetaData =
 					new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
 
+			long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L;
+
 			checkpointMetaData
-					.setBytesBufferedInAlignment(bufferSpiller.getBytesWritten())
+					.setBytesBufferedInAlignment(bytesBuffered)
 					.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
 			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);

http://git-wip-us.apache.org/repos/asf/flink/blob/e1b2cd01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 5133351..0e3ee22 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -52,6 +52,9 @@ import org.apache.flink.util.StringUtils;
 @Internal
 public class BufferSpiller {
 
+	/** Size of header in bytes (see add method). */
+	static final int HEADER_SIZE = 9;
+
 	/** The counter that selects the next directory to spill into */
 	private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e1b2cd01/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 446cc77..d17225c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -33,10 +33,8 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.TaskStateHandles;
-
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -50,7 +48,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.argThat;
@@ -262,6 +259,7 @@ public class BarrierBufferTest {
 			check(sequence[5], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
 			validateAlignmentTime(startTs, buffer);
+			validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[5],
sequence[6]);
 
 			check(sequence[6], buffer.getNextNonBlocked());
 
@@ -278,11 +276,13 @@ public class BarrierBufferTest {
 			check(sequence[17], buffer.getNextNonBlocked());
 			assertEquals(3L, handler.getNextExpectedCheckpointId());
 			validateAlignmentTime(startTs, buffer);
+			validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment());
 
 			check(sequence[18], buffer.getNextNonBlocked());
 
 			// checkpoint 3 starts, data buffered
 			check(sequence[20], buffer.getNextNonBlocked());
+			validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[20],
sequence[21]);
 			assertEquals(4L, handler.getNextExpectedCheckpointId());
 			check(sequence[21], buffer.getNextNonBlocked());
 
@@ -290,7 +290,10 @@ public class BarrierBufferTest {
 
 			// pre checkpoint 5
 			check(sequence[27], buffer.getNextNonBlocked());
+
+			validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment());
 			assertEquals(5L, handler.getNextExpectedCheckpointId());
+
 			check(sequence[28], buffer.getNextNonBlocked());
 			check(sequence[29], buffer.getNextNonBlocked());
 			
@@ -311,10 +314,13 @@ public class BarrierBufferTest {
 			check(sequence[42], buffer.getNextNonBlocked());
 			check(sequence[43], buffer.getNextNonBlocked());
 			check(sequence[44], buffer.getNextNonBlocked());
-			
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 
+			validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(),
+				sequence[34], sequence[36], sequence[38], sequence[39]);
+
 			buffer.cleanup();
 
 			checkNoTempFilesRemain();
@@ -1442,13 +1448,25 @@ public class BarrierBufferTest {
 		assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
 	}
 
+	private static void validateAlignmentBuffered(long actualBytesBuffered, BufferOrEvent...
sequence) {
+		long expectedBuffered = 0;
+		for (BufferOrEvent boe : sequence) {
+			if (boe.isBuffer()) {
+				expectedBuffered += BufferSpiller.HEADER_SIZE + boe.getBuffer().getSize();
+			}
+		}
+
+		assertEquals("Wrong alignment buffered bytes", actualBytesBuffered, expectedBuffered);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 
 	private static class ValidatingCheckpointHandler implements StatefulTask {
-		
+
 		private long nextExpectedCheckpointId = -1L;
+		private long lastReportedBytesBufferedInAlignment = -1;
 
 		public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) {
 			this.nextExpectedCheckpointId = nextExpectedCheckpointId;
@@ -1458,6 +1476,10 @@ public class BarrierBufferTest {
 			return nextExpectedCheckpointId;
 		}
 
+		long getLastReportedBytesBufferedInAlignment() {
+			return lastReportedBytesBufferedInAlignment;
+		}
+
 		@Override
 		public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
@@ -1479,6 +1501,7 @@ public class BarrierBufferTest {
 			assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
 
 			nextExpectedCheckpointId++;
+			lastReportedBytesBufferedInAlignment = checkpointMetaData.getBytesBufferedInAlignment();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e1b2cd01/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index e85eddb..24aabcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -331,7 +331,22 @@ public class BufferSpillerTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * Tests that the static HEADER_SIZE field has valid header size.
+	 */
+	@Test
+	public void testHeaderSizeStaticField() throws Exception {
+		int size = 13;
+		BufferOrEvent boe = generateRandomBuffer(size, 0);
+		spiller.add(boe);
+
+		assertEquals(
+			"Changed the header format, but did not adjust the HEADER_SIZE field",
+			BufferSpiller.HEADER_SIZE + size,
+			spiller.getBytesWritten());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utils
 	// ------------------------------------------------------------------------


Mime
View raw message