flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/4] flink git commit: [FLINK-1954] [FLINK-1958] [runtime] Cancel transfers of failed receiver tasks
Date Mon, 01 Jun 2015 07:24:54 GMT
[FLINK-1954] [FLINK-1958] [runtime] Cancel transfers of failed receiver tasks


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dce1be18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dce1be18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dce1be18

Branch: refs/heads/master
Commit: dce1be18593539ff29c3d55c5f2c1208a2e54c10
Parents: 5b79957
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu May 28 11:45:52 2015 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jun 1 09:21:55 2015 +0200

----------------------------------------------------------------------
 .../network/netty/PartitionRequestClient.java   | 10 ++-
 .../netty/PartitionRequestClientHandler.java    | 23 +++----
 .../io/network/netty/PartitionRequestQueue.java | 27 +++++++-
 .../partition/consumer/RemoteInputChannel.java  |  6 +-
 .../netty/CancelPartitionRequestTest.java       | 68 +++++++++++++++++++-
 5 files changed, 111 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dce1be18/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index cb4ecb6..3049af6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -58,7 +58,12 @@ public class PartitionRequestClient {
 	// If zero, the underlying TCP channel can be safely closed
 	private final AtomicDisposableReferenceCounter closeReferenceCounter = new AtomicDisposableReferenceCounter();
 
-	PartitionRequestClient(Channel tcpChannel, PartitionRequestClientHandler partitionRequestHandler,
ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
+	PartitionRequestClient(
+			Channel tcpChannel,
+			PartitionRequestClientHandler partitionRequestHandler,
+			ConnectionID connectionId,
+			PartitionRequestClientFactory clientFactory) {
+
 		this.tcpChannel = checkNotNull(tcpChannel);
 		this.partitionRequestHandler = checkNotNull(partitionRequestHandler);
 		this.connectionId = checkNotNull(connectionId);
@@ -168,5 +173,8 @@ public class PartitionRequestClient {
 			// Make sure to remove the client from the factory
 			clientFactory.destroyPartitionRequestClient(connectionId, this);
 		}
+		else {
+			partitionRequestHandler.cancelRequestFor(inputChannel.getInputChannelId());
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dce1be18/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 5964b49..985e2a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.flink.core.memory.MemorySegment;
@@ -39,7 +39,6 @@ import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,7 +64,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 	 * Set of cancelled partition requests. A request is cancelled iff an input channel is cleared
 	 * while data is still coming in for this channel.
 	 */
-	private volatile Set<InputChannelID> cancelled;
+	private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap();
 
 	private ChannelHandlerContext ctx;
 
@@ -85,6 +84,12 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 		inputChannels.remove(listener.getInputChannelId());
 	}
 
+	void cancelRequestFor(InputChannelID inputChannelId) {
+		if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) {
+			ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Network events
 	// ------------------------------------------------------------------------
@@ -183,18 +188,6 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 		}
 	}
 
-	private void cancelRequestFor(InputChannelID inputChannelId) {
-		if (cancelled == null) {
-			cancelled = Sets.newConcurrentHashSet();
-		}
-
-		if (!cancelled.contains(inputChannelId)) {
-			ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId));
-
-			cancelled.add(inputChannelId);
-		}
-	}
-
 	// ------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/dce1be18/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 279ddbe..c121091 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
+import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
@@ -51,6 +53,8 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 	private final Queue<SequenceNumberingSubpartitionView> queue = new ArrayDeque<SequenceNumberingSubpartitionView>();
 
+	private final Set<InputChannelID> released = Sets.newHashSet();
+
 	private SequenceNumberingSubpartitionView currentPartitionQueue;
 
 	private boolean fatalError;
@@ -88,9 +92,14 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		else if (msg.getClass() == InputChannelID.class) {
 			InputChannelID toCancel = (InputChannelID) msg;
 
+			if (released.contains(toCancel)) {
+				return;
+			}
+
 			// Cancel the request for the input channel
 			if (currentPartitionQueue != null && currentPartitionQueue.getReceiverId().equals(toCancel))
{
 				currentPartitionQueue.releaseAllResources();
+				markAsReleased(currentPartitionQueue.receiverId);
 				currentPartitionQueue = null;
 			}
 			else {
@@ -101,6 +110,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 					if (curr.getReceiverId().equals(toCancel)) {
 						curr.releaseAllResources();
+						markAsReleased(curr.receiverId);
 					}
 					else {
 						queue.add(curr);
@@ -139,6 +149,8 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 							currentPartitionQueue = null;
 						}
 						else if (currentPartitionQueue.isReleased()) {
+							markAsReleased(currentPartitionQueue.getReceiverId());
+
 							currentPartitionQueue = null;
 						}
 					}
@@ -150,8 +162,9 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 							currentPartitionQueue.notifySubpartitionConsumed();
 							currentPartitionQueue.releaseAllResources();
-							currentPartitionQueue = null;
+							markAsReleased(currentPartitionQueue.getReceiverId());
 
+							currentPartitionQueue = null;
 						}
 
 						channel.writeAndFlush(resp).addListener(writeListener);
@@ -194,14 +207,25 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	private void releaseAllResources() throws IOException {
 		if (currentPartitionQueue != null) {
 			currentPartitionQueue.releaseAllResources();
+			markAsReleased(currentPartitionQueue.getReceiverId());
+
 			currentPartitionQueue = null;
 		}
 
 		while ((currentPartitionQueue = queue.poll()) != null) {
 			currentPartitionQueue.releaseAllResources();
+
+			markAsReleased(currentPartitionQueue.getReceiverId());
 		}
 	}
 
+	/**
+	 * Marks a receiver as released.
+	 */
+	private void markAsReleased(InputChannelID receiverId) {
+		released.add(receiverId);
+	}
+
 	// This listener is called after an element of the current queue has been
 	// flushed. If successful, the listener triggers further processing of the
 	// queues.
@@ -285,5 +309,4 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 			ctx.pipeline().fireUserEventTriggered(this);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dce1be18/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 090e94d..b70c3a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -75,7 +75,7 @@ public class RemoteInputChannel extends InputChannel {
 	 */
 	private int expectedSequenceNumber = 0;
 
-	RemoteInputChannel(
+	public RemoteInputChannel(
 			SingleInputGate inputGate,
 			int channelIndex,
 			ResultPartitionID partitionId,
@@ -86,7 +86,7 @@ public class RemoteInputChannel extends InputChannel {
 				new Tuple2<Integer, Integer>(0, 0));
 	}
 
-	RemoteInputChannel(
+	public RemoteInputChannel(
 			SingleInputGate inputGate,
 			int channelIndex,
 			ResultPartitionID partitionId,
@@ -193,6 +193,8 @@ public class RemoteInputChannel extends InputChannel {
 				}
 			}
 
+			// The released flag has to be set before closing the connection to ensure that
+			// buffers received concurrently with closing are properly recycled.
 			if (partitionRequestClient != null) {
 				partitionRequestClient.close(this);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/dce1be18/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 75f4284..8f443a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -38,7 +38,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.*;
+import static org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
+import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
@@ -46,13 +47,17 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CancelPartitionRequestTest {
 
 	/**
 	 * Verifies that requests for non-existing (failed/cancelled) input channels are properly
-	 * cancelled.
+	 * cancelled. The receiver receives data, but there is no input channel to receive the data.
+	 * This should cancel the request.
 	 */
 	@Test
 	public void testCancelPartitionRequest() throws Exception {
@@ -68,9 +73,11 @@ public class CancelPartitionRequestTest {
 
 			CountDownLatch sync = new CountDownLatch(1);
 
+			ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
+
 			// Return infinite subpartition
 			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class)))
-					.thenReturn(new InfiniteSubpartitionView(outboundBuffers, sync));
+					.thenReturn(view);
 
 			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
 					partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
@@ -87,6 +94,61 @@ public class CancelPartitionRequestTest {
 				fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
 						" ms to be notified about cancelled partition.");
 			}
+
+			verify(view, times(1)).releaseAllResources();
+			verify(view, times(0)).notifySubpartitionConsumed();
+		}
+		finally {
+			shutdown(serverAndClient);
+		}
+	}
+
+	@Test
+	public void testDuplicateCancel() throws Exception {
+
+		NettyServerAndClient serverAndClient = null;
+
+		try {
+			TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
+
+			ResultPartitionManager partitions = mock(ResultPartitionManager.class);
+
+			ResultPartitionID pid = new ResultPartitionID();
+
+			CountDownLatch sync = new CountDownLatch(1);
+
+			ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
+
+			// Return infinite subpartition
+			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class)))
+					.thenReturn(view);
+
+			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
+					partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
+
+			serverAndClient = initServerAndClient(protocol);
+
+			Channel ch = connect(serverAndClient);
+
+			// Request for non-existing input channel => results in cancel request
+			InputChannelID inputChannelId = new InputChannelID();
+
+			ch.writeAndFlush(new PartitionRequest(pid, 0, inputChannelId)).await();
+
+			// Wait for the notification
+			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
+				fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
+						" ms to be notified about cancelled partition.");
+			}
+
+			ch.writeAndFlush(new CancelPartitionRequest(inputChannelId)).await();
+
+			ch.close();
+
+			NettyTestUtil.awaitClose(ch);
+
+			verify(view, times(1)).releaseAllResources();
+			verify(view, times(0)).notifySubpartitionConsumed();
 		}
 		finally {
 			shutdown(serverAndClient);


Mime
View raw message