flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-2134] Close Netty channel via CloseRequest msg
Date Thu, 04 Jun 2015 09:16:33 GMT
Repository: flink
Updated Branches:
  refs/heads/master 11643c0cc -> 0dea359b3


[FLINK-2134] Close Netty channel via CloseRequest msg

This closes #773.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dea359b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dea359b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dea359b

Branch: refs/heads/master
Commit: 0dea359b30c15abc07b5c9af8f775adf235a6cb0
Parents: 11643c0
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Jun 3 18:41:40 2015 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Jun 4 11:16:03 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  | 20 ++++++++++++++++++++
 .../network/netty/PartitionRequestClient.java   | 15 +++++++++++++--
 .../io/network/netty/PartitionRequestQueue.java |  6 ++++++
 .../netty/PartitionRequestServerHandler.java    |  4 ++++
 .../netty/NettyMessageSerializationTest.java    |  7 +++++++
 5 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 53afd03..1540369 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -150,6 +150,9 @@ abstract class NettyMessage {
 			else if (msgId == CancelPartitionRequest.ID) {
 				decodedMsg = new CancelPartitionRequest();
 			}
+			else if (msgId == CloseRequest.ID) {
+				decodedMsg = new CloseRequest();
+			}
 			else {
 				throw new IllegalStateException("Received unknown message from producer: " + msg);
 			}
@@ -521,6 +524,23 @@ abstract class NettyMessage {
 		}
 	}
 
+	static class CloseRequest extends NettyMessage {
+
+		private static final byte ID = 5;
+
+		public CloseRequest() {
+		}
+
+		@Override
+		ByteBuf write(ByteBufAllocator allocator) throws Exception {
+			return allocateBuffer(allocator, ID, 0);
+		}
+
+		@Override
+		void readFrom(ByteBuf buffer) throws Exception {
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static class ByteBufDataInputView implements DataInputView {

http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 3049af6..78f6398 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -96,6 +96,8 @@ public class PartitionRequestClient {
 			final RemoteInputChannel inputChannel,
 			int delayMs) throws IOException {
 
+		checkNotClosed();
+
 		LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
 				subpartitionIndex, partitionId, delayMs);
 
@@ -146,6 +148,7 @@ public class PartitionRequestClient {
 	 * consumer task run pipelined.
 	 */
 	public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, final RemoteInputChannel
inputChannel) throws IOException {
+		checkNotClosed();
 
 		tcpChannel.writeAndFlush(new TaskEventRequest(event, partitionId, inputChannel.getInputChannelId()))
 				.addListener(
@@ -167,8 +170,10 @@ public class PartitionRequestClient {
 		partitionRequestHandler.removeInputChannel(inputChannel);
 
 		if (closeReferenceCounter.decrement()) {
-			// Close the TCP connection
-			tcpChannel.close();
+			// Close the TCP connection. Send a close request msg to ensure
+			// that outstanding backwards task events are not discarded.
+			tcpChannel.writeAndFlush(new NettyMessage.CloseRequest())
+					.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
 
 			// Make sure to remove the client from the factory
 			clientFactory.destroyPartitionRequestClient(connectionId, this);
@@ -177,4 +182,10 @@ public class PartitionRequestClient {
 			partitionRequestHandler.cancelRequestFor(inputChannel.getInputChannelId());
 		}
 	}
+
+	private void checkNotClosed() throws IOException {
+		if (closeReferenceCounter.isDisposed()) {
+			throw new LocalTransportException("Channel closed.", tcpChannel.localAddress());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 5301195..bb8c851 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -79,6 +79,12 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		ctx.pipeline().fireUserEventTriggered(receiverId);
 	}
 
+	public void close() {
+		if (ctx != null) {
+			ctx.channel().close();
+		}
+	}
+
 	@Override
 	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
 		if (msg.getClass() == SequenceNumberingSubpartitionView.class) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 90c93e5..e278d07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -120,6 +121,9 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 
 				outboundQueue.cancel(request.receiverId);
 			}
+			else if (msgClazz == CloseRequest.class) {
+				outboundQueue.close();
+			}
 			else {
 				LOG.warn("Received unexpected client request: {}", msg);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index b1315be..60241e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -147,6 +147,13 @@ public class NettyMessageSerializationTest {
 
 			assertEquals(expected.receiverId, actual.receiverId);
 		}
+
+		{
+			NettyMessage.CloseRequest expected = new NettyMessage.CloseRequest();
+			NettyMessage.CloseRequest actual = encodeAndDecode(expected);
+
+			assertEquals(expected.getClass(), actual.getClass());
+		}
 	}
 
 	@SuppressWarnings("unchecked")


Mime
View raw message