flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-2089] [runtime] Fix illegal state in RecordWriter after partition write failure
Date Wed, 26 Aug 2015 08:11:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6f07c5f3a -> b2f8e3070


[FLINK-2089] [runtime] Fix illegal state in RecordWriter after partition write failure

- Address PR comments

This closes #1050.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2f8e307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2f8e307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2f8e307

Branch: refs/heads/master
Commit: b2f8e3070b8db16093f36680f9c8ce124aae8627
Parents: 6f07c5f
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Aug 19 16:11:13 2015 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Aug 26 10:10:12 2015 +0200

----------------------------------------------------------------------
 .../io/network/api/writer/RecordWriter.java     |  59 +++--
 .../io/network/api/writer/RecordWriterTest.java | 217 +++++++++++--------
 2 files changed, 162 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2f8e307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 17a6a18..c534aa2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -86,8 +86,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 					Buffer buffer = serializer.getCurrentBuffer();
 
 					if (buffer != null) {
-						writer.writeBuffer(buffer, targetChannel);
-						serializer.clearCurrentBuffer();
+						writeBuffer(buffer, targetChannel, serializer);
 					}
 
 					buffer = writer.getBufferProvider().requestBufferBlocking();
@@ -112,8 +111,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 					Buffer buffer = serializer.getCurrentBuffer();
 
 					if (buffer != null) {
-						writer.writeBuffer(buffer, targetChannel);
-						serializer.clearCurrentBuffer();
+						writeBuffer(buffer, targetChannel, serializer);
 					}
 
 					buffer = writer.getBufferProvider().requestBufferBlocking();
@@ -135,8 +133,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 						throw new IllegalStateException("Serializer has data but no buffer.");
 					}
 
-					writer.writeBuffer(buffer, targetChannel);
-					serializer.clearCurrentBuffer();
+					writeBuffer(buffer, targetChannel, serializer);
 
 					writer.writeEvent(event, targetChannel);
 
@@ -157,8 +154,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 			synchronized (serializer) {
 				Buffer buffer = serializer.getCurrentBuffer();
 				if (buffer != null) {
-					writer.writeBuffer(buffer, targetChannel);
-					serializer.clearCurrentBuffer();
+					writeBuffer(buffer, targetChannel, serializer);
 
 					buffer = writer.getBufferProvider().requestBufferBlocking();
 					serializer.setNextBuffer(buffer);
@@ -174,26 +170,31 @@ public class RecordWriter<T extends IOReadableWritable> {
 			RecordSerializer<T> serializer = serializers[targetChannel];
 
 			synchronized (serializer) {
-				Buffer buffer = serializer.getCurrentBuffer();
+				try {
+					Buffer buffer = serializer.getCurrentBuffer();
 
-				if (buffer != null) {
-					// Only clear the serializer after the buffer was written out.
-					writer.writeBuffer(buffer, targetChannel);
+					if (buffer != null) {
+						writeBuffer(buffer, targetChannel, serializer);
+					}
+				} finally {
+					serializer.clear();
 				}
-
-				serializer.clear();
 			}
 		}
 	}
 
 	public void clearBuffers() {
-		for (RecordSerializer<?> s : serializers) {
-			synchronized (s) {
-				Buffer b = s.getCurrentBuffer();
-				s.clear();
+		for (RecordSerializer<?> serializer : serializers) {
+			synchronized (serializer) {
+				try {
+					Buffer buffer = serializer.getCurrentBuffer();
 
-				if (b != null) {
-					b.recycle();
+					if (buffer != null) {
+						buffer.recycle();
+					}
+				}
+				finally {
+					serializer.clear();
 				}
 			}
 		}
@@ -208,4 +209,22 @@ public class RecordWriter<T extends IOReadableWritable> {
 		}
 	}
 
+	/**
+	 * Writes the buffer to the {@link ResultPartitionWriter}.
+	 *
+	 * <p> The buffer is cleared from the serializer state after a call to this method.
+	 */
+	private void writeBuffer(
+			Buffer buffer,
+			int targetChannel,
+			RecordSerializer<T> serializer) throws IOException {
+
+		try {
+			writer.writeBuffer(buffer, targetChannel);
+		}
+		finally {
+			serializer.clearCurrentBuffer();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b2f8e307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 7061fb8..9e10582 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -19,9 +19,13 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.types.IntValue;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.invocation.InvocationOnMock;
@@ -37,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -59,100 +64,6 @@ public class RecordWriterTest {
 	// Resource release tests
 	// ---------------------------------------------------------------------------------------------
 
-	@Test
-	public void testClearBuffersAfterEmit() throws Exception {
-		final Buffer buffer = TestBufferFactory.createBuffer(32);
-
-		BufferProvider bufferProvider = createBufferProvider(buffer);
-		ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider);
-
-		RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
-
-		// Emit single record, the buffer will not be written out.
-		// Therefore, it needs to be cleared explicitly.
-		recordWriter.emit(new IntValue(0));
-
-		// Verify that a buffer is requested, but not written out.
-		verify(bufferProvider, times(1)).requestBufferBlocking();
-		verify(partitionWriter, never()).writeBuffer(any(Buffer.class), anyInt());
-
-		recordWriter.clearBuffers();
-
-		assertTrue("Buffer not recycled.", buffer.isRecycled());
-	}
-
-	@Test
-	public void testClearBuffersAfterExceptionInFlushWritePartition() throws Exception {
-		// Size of buffer ensures that a single record will fill the buffer.
-		final Buffer buffer = TestBufferFactory.createBuffer(4);
-
-		BufferProvider bufferProvider = createBufferProvider(buffer);
-		ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider);
-
-		doThrow(new IOException("Expected test exception"))
-				.when(partitionWriter).writeBuffer(eq(buffer), eq(0));
-
-		RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
-
-		try {
-			// Emit single record, the buffer will not be written out,
-			// because of the Exception. Therefore, it needs to be cleared
-			// explicitly.
-			recordWriter.emit(new IntValue(0));
-
-			fail("Did not throw expected Exception. This means that the record "
-					+ "writer did not request a buffer as expected.");
-		}
-		catch (IOException expected) {
-		}
-
-		// Verify that a buffer is requested, but not written out due to the Exception.
-		verify(bufferProvider, times(1)).requestBufferBlocking();
-		verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
-
-		recordWriter.clearBuffers();
-
-		assertTrue("Buffer not recycled.", buffer.isRecycled());
-
-	}
-
-	@Test
-	public void testClearBuffersAfterExceptionInEmitWritePartition() throws Exception {
-		// Size of buffer ensures that a single record will NOT fill the buffer.
-		final Buffer buffer = TestBufferFactory.createBuffer(32);
-
-		BufferProvider bufferProvider = createBufferProvider(buffer);
-		ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider);
-
-		doThrow(new IOException("Expected test exception"))
-				.when(partitionWriter).writeBuffer(eq(buffer), eq(0));
-
-		RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
-
-		try {
-			recordWriter.emit(new IntValue(0));
-
-			// Verify that a buffer is requested, but not written out.
-			verify(bufferProvider, times(1)).requestBufferBlocking();
-			verify(partitionWriter, never()).writeBuffer(any(Buffer.class), anyInt());
-
-			// Now flush the record.
-			recordWriter.flush();
-
-			fail("Did not throw expected Exception. This means that the record "
-					+ "writer did not request a buffer as expected.");
-		}
-		catch (IOException expected) {
-		}
-
-		// Flushing the buffer tried to write out the buffer.
-		verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
-
-		recordWriter.clearBuffers();
-
-		assertTrue("Buffer not recycled.", buffer.isRecycled());
-	}
-
 	/**
 	 * Tests a fix for FLINK-2089.
 	 *
@@ -242,6 +153,124 @@ public class RecordWriterTest {
 	}
 
 	@Test
+	public void testClearBuffersAfterExceptionInPartitionWriter() throws Exception {
+		NetworkBufferPool buffers = null;
+		BufferPool bufferPool = null;
+
+		try {
+			buffers = new NetworkBufferPool(1, 1024);
+			bufferPool = spy(buffers.createBufferPool(1, true));
+
+			ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
+			when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
+			when(partitionWriter.getNumberOfOutputChannels()).thenReturn(1);
+
+			// Recycle buffer and throw Exception
+			doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					Buffer buffer = (Buffer) invocation.getArguments()[0];
+					buffer.recycle();
+
+					throw new RuntimeException("Expected test Exception");
+				}
+			}).when(partitionWriter).writeBuffer(any(Buffer.class), anyInt());
+
+			RecordWriter<IntValue> recordWriter = new RecordWriter<>(partitionWriter);
+
+			try {
+				// Verify that emit correctly clears the buffer. The infinite loop looks
+				// dangerous indeed, but the buffer will only be flushed after its full. Adding a
+				// manual flush here doesn't test this case (see next).
+				for (;;) {
+					recordWriter.emit(new IntValue(0));
+				}
+			}
+			catch (Exception e) {
+				// Verify that the buffer is not part of the record writer state after a failure
+				// to flush it out. If the buffer is still part of the record writer state, this
+				// will fail, because the buffer has already been recycled. NOTE: The mock
+				// partition writer needs to recycle the buffer to correctly test this.
+				recordWriter.clearBuffers();
+			}
+
+			// Verify expected methods have been called
+			verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
+			verify(bufferPool, times(1)).requestBufferBlocking();
+
+			try {
+				// Verify that manual flushing correctly clears the buffer.
+				recordWriter.emit(new IntValue(0));
+				recordWriter.flush();
+
+				Assert.fail("Did not throw expected test Exception");
+			}
+			catch (Exception e) {
+				recordWriter.clearBuffers();
+			}
+
+			// Verify expected methods have been called
+			verify(partitionWriter, times(2)).writeBuffer(any(Buffer.class), anyInt());
+			verify(bufferPool, times(2)).requestBufferBlocking();
+
+			try {
+				// Verify that broadcast emit correctly clears the buffer.
+				for (;;) {
+					recordWriter.broadcastEmit(new IntValue(0));
+				}
+			}
+			catch (Exception e) {
+				recordWriter.clearBuffers();
+			}
+
+			// Verify expected methods have been called
+			verify(partitionWriter, times(3)).writeBuffer(any(Buffer.class), anyInt());
+			verify(bufferPool, times(3)).requestBufferBlocking();
+
+			try {
+				// Verify that end of super step correctly clears the buffer.
+				recordWriter.emit(new IntValue(0));
+				recordWriter.sendEndOfSuperstep();
+
+				Assert.fail("Did not throw expected test Exception");
+			}
+			catch (Exception e) {
+				recordWriter.clearBuffers();
+			}
+
+			// Verify expected methods have been called
+			verify(partitionWriter, times(4)).writeBuffer(any(Buffer.class), anyInt());
+			verify(bufferPool, times(4)).requestBufferBlocking();
+
+			try {
+				// Verify that broadcasting and event correctly clears the buffer.
+				recordWriter.emit(new IntValue(0));
+				recordWriter.broadcastEvent(new TestTaskEvent());
+
+				Assert.fail("Did not throw expected test Exception");
+			}
+			catch (Exception e) {
+				recordWriter.clearBuffers();
+			}
+
+			// Verify expected methods have been called
+			verify(partitionWriter, times(5)).writeBuffer(any(Buffer.class), anyInt());
+			verify(bufferPool, times(5)).requestBufferBlocking();
+		}
+		finally {
+			if (bufferPool != null) {
+				assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
+				bufferPool.lazyDestroy();
+			}
+
+			if (buffers != null) {
+				assertEquals(1, buffers.getNumberOfAvailableMemorySegments());
+				buffers.destroy();
+			}
+		}
+	}
+
+	@Test
 	public void testSerializerClearedAfterClearBuffers() throws Exception {
 
 		final Buffer buffer = TestBufferFactory.createBuffer(16);


Mime
View raw message