flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] flink git commit: [FLINK-4021] [network] Add test for staged buffers auto read behaviour
Date Fri, 19 Aug 2016 12:20:04 GMT
[FLINK-4021] [network] Add test for staged buffers auto read behaviour

This closes #2141.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7dbcffb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7dbcffb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7dbcffb9

Branch: refs/heads/master
Commit: 7dbcffb901a2a0e1b64b9ae78977726febe29999
Parents: 6615ee7
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Aug 15 15:59:09 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Aug 19 12:28:15 2016 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  |  2 +-
 .../PartitionRequestClientHandlerTest.java      | 86 ++++++++++++++++++++
 .../runtime/testutils/DiscardingRecycler.java   |  2 +
 3 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7dbcffb9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 3a24181..2b03f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -192,7 +192,7 @@ abstract class NettyMessage {
 			buffer = null;
 		}
 
-		BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) {
+		public BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) {
 			this.buffer = buffer;
 			this.sequenceNumber = sequenceNumber;
 			this.receiverId = receiverId;

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbcffb9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 2c08cc5..26d791f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -22,6 +22,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
@@ -31,11 +34,17 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -158,8 +167,85 @@ public class PartitionRequestClientHandlerTest {
 		client.cancelRequestFor(inputChannel.getInputChannelId());
 	}
 
+	/**
+	 * Tests that an unsuccessful message decode call for a staged message
+	 * does not leave the channel with auto read set to false.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testAutoReadAfterUnsuccessfulStagedMessage() throws Exception {
+		PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
+		EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+		final AtomicReference<EventListener<Buffer>> listener = new AtomicReference<>();
+
+		BufferProvider bufferProvider = mock(BufferProvider.class);
+		when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new Answer<Boolean>()
{
+			@Override
+			@SuppressWarnings("unchecked")
+			public Boolean answer(InvocationOnMock invocation) throws Throwable {
+				listener.set((EventListener<Buffer>) invocation.getArguments()[0]);
+				return true;
+			}
+		});
+
+		when(bufferProvider.requestBuffer()).thenReturn(null);
+
+		InputChannelID channelId = new InputChannelID(0, 0);
+		RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
+		when(inputChannel.getInputChannelId()).thenReturn(channelId);
+
+		// The 3rd staged msg has a null buffer provider
+		when(inputChannel.getBufferProvider()).thenReturn(bufferProvider, bufferProvider, null);
+
+		handler.addInputChannel(inputChannel);
+
+		BufferResponse msg = createBufferResponse(createBuffer(true), 0, channelId);
+
+		// Write 1st buffer msg. No buffer is available, therefore the buffer
+		// should be staged and auto read should be set to false.
+		assertTrue(channel.config().isAutoRead());
+		channel.writeInbound(msg);
+
+		// No buffer available, auto read false
+		assertFalse(channel.config().isAutoRead());
+
+		// Write more buffers... all staged.
+		msg = createBufferResponse(createBuffer(true), 1, channelId);
+		channel.writeInbound(msg);
+
+		msg = createBufferResponse(createBuffer(true), 2, channelId);
+		channel.writeInbound(msg);
+
+		// Notify about buffer => handle 1st msg
+		Buffer availableBuffer = createBuffer(false);
+		listener.get().onEvent(availableBuffer);
+
+		// Start processing of staged buffers (in run pending tasks). Make
+		// sure that the buffer provider acts like it's destroyed.
+		when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
+		when(bufferProvider.isDestroyed()).thenReturn(true);
+
+		// Execute all tasks that are scheduled in the event loop. Further
+		// eventLoop().execute() calls are directly executed, if they are
+		// called in the scope of this call.
+		channel.runPendingTasks();
+
+		assertTrue(channel.config().isAutoRead());
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
+	private static Buffer createBuffer(boolean fill) {
+		MemorySegment segment = HeapMemorySegment.FACTORY.allocateUnpooledSegment(1024, null);
+		if (fill) {
+			for (int i = 0; i < 1024; i++) {
+				segment.put(i, (byte) i);
+			}
+		}
+		return new Buffer(segment, DiscardingRecycler.INSTANCE, true);
+	}
+
 	/**
 	 * Returns a deserialized buffer message as it would be received during runtime.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbcffb9/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
index 466d1d6..23f8224 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 
 public class DiscardingRecycler implements BufferRecycler {
 
+	public static final BufferRecycler INSTANCE = new DiscardingRecycler();
+
 	@Override
 	public void recycle(MemorySegment memSeg) {
 		memSeg.free();


Mime
View raw message