flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [27/50] [abbrv] flink git commit: [FLINK-3369] [runtime] Make RemoteTransportException instance of CancelTaskException
Date Fri, 12 Feb 2016 11:29:52 GMT
[FLINK-3369] [runtime] Make RemoteTransportException instance of CancelTaskException

Problem: RemoteTransportException (RTE) is thrown on data transfer failures
when the remote data producer fails. Because RTE is an instance of IOException,
it can happen that the RTE is reported as the root job failure cause.

Solution: Make RTE instance of CancelTaskException, leading to cancellation of
the task and not failure.

Squashes the following commit:

[pr-comments] Add remote address to RemoteTransportException

This closes #1621.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf3ae88b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf3ae88b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf3ae88b

Branch: refs/heads/tableOnCalcite
Commit: cf3ae88b73e30a2d69ac1cc6009a8304ea3f53cc
Parents: fd324ea
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Feb 10 19:51:20 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Feb 11 14:39:40 2016 +0100

----------------------------------------------------------------------
 .../runtime/execution/CancelTaskException.java  |  4 ++
 .../network/netty/PartitionRequestClient.java   |  6 +--
 .../netty/PartitionRequestClientFactory.java    |  4 +-
 .../netty/PartitionRequestClientHandler.java    | 17 +++-----
 .../exception/LocalTransportException.java      | 22 +++++++---
 .../exception/RemoteTransportException.java     | 36 +++++++++++++---
 .../netty/exception/TransportException.java     | 43 --------------------
 7 files changed, 62 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
index 3bcbe2e..ebf58ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
@@ -34,6 +34,10 @@ public class CancelTaskException extends RuntimeException {
 		super(msg);
 	}
 
+	public CancelTaskException(String msg, Throwable cause) {
+		super(msg, cause);
+	}
+
 	public CancelTaskException() {
 		super();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index f6120d4..fb24a8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -114,7 +114,7 @@ public class PartitionRequestClient {
 					inputChannel.onError(
 							new LocalTransportException(
 									"Sending the partition request failed.",
-									future.channel().localAddress(), future.cause()
+									future.cause()
 							));
 				}
 			}
@@ -158,7 +158,7 @@ public class PartitionRequestClient {
 								if (!future.isSuccess()) {
 									inputChannel.onError(new LocalTransportException(
 											"Sending the task event failed.",
-											future.channel().localAddress(), future.cause()
+											future.cause()
 									));
 								}
 							}
@@ -185,7 +185,7 @@ public class PartitionRequestClient {
 
 	private void checkNotClosed() throws IOException {
 		if (closeReferenceCounter.isDisposed()) {
-			throw new LocalTransportException("Channel closed.", tcpChannel.localAddress());
+			throw new LocalTransportException("Channel " + tcpChannel.localAddress() + "closed.");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 040a8ef..8eae035 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -216,12 +216,12 @@ class PartitionRequestClientFactory {
 						"Connecting to remote task manager + '" + connectionId.getAddress() +
 								"' has failed. This might indicate that the remote task " +
 								"manager has been lost.",
-						connectionId.getAddress(), future.cause()));
+						future.cause(), connectionId.getAddress()));
 			}
 			else {
 				notifyOfError(new LocalTransportException(
 						"Connecting to remote task manager + '" + connectionId.getAddress() +
-								"' has been cancelled.", null));
+								"' has been cancelled."));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index ee015c2..afcd881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
-import org.apache.flink.runtime.io.network.netty.exception.TransportException;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
@@ -133,27 +132,23 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 	@Override
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
 
-		if (cause instanceof TransportException) {
+		if (cause instanceof LocalTransportException || cause instanceof RemoteTransportException)
{
 			notifyAllChannelsOfErrorAndClose(cause);
 		}
 		else {
 			final SocketAddress remoteAddr = ctx.channel().remoteAddress();
 
-			final TransportException tex;
-
 			// Improve on the connection reset by peer error message
 			if (cause instanceof IOException
 					&& cause.getMessage().equals("Connection reset by peer")) {
 
-				tex = new RemoteTransportException(
+				notifyAllChannelsOfErrorAndClose(new RemoteTransportException(
 						"Lost connection to task manager '" + remoteAddr + "'. This indicates "
-								+ "that the remote task manager was lost.", remoteAddr, cause);
+								+ "that the remote task manager was lost.", cause, remoteAddr));
 			}
 			else {
-				tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause);
+				notifyAllChannelsOfErrorAndClose(new LocalTransportException(cause.getMessage(), cause));
 			}
-
-			notifyAllChannelsOfErrorAndClose(tex);
 		}
 	}
 
@@ -228,7 +223,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 			if (error.isFatalError()) {
 				notifyAllChannelsOfErrorAndClose(new RemoteTransportException(
 						"Fatal error at remote task manager '" + remoteAddr + "'.",
-						remoteAddr, error.cause));
+						error.cause, remoteAddr));
 			}
 			else {
 				RemoteInputChannel inputChannel = inputChannels.get(error.receiverId);
@@ -240,7 +235,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 					else {
 						inputChannel.onError(new RemoteTransportException(
 								"Error at remote task manager '" + remoteAddr + "'.",
-										remoteAddr, error.cause));
+										error.cause, remoteAddr));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
index 37f6e53..7851953 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
@@ -18,17 +18,27 @@
 
 package org.apache.flink.runtime.io.network.netty.exception;
 
-import java.net.SocketAddress;
+import java.io.IOException;
 
-public class LocalTransportException extends TransportException {
+/**
+ * Exception thrown on local transport failures.
+ *
+ * <p>If you get this type of exception at task manager T, it means that
+ * something went wrong in the local network stack of task manager T.
+ */
+public class LocalTransportException extends IOException {
 
 	private static final long serialVersionUID = 2366708881288640674L;
 
-	public LocalTransportException(String message, SocketAddress address) {
-		super(message, address);
+	public LocalTransportException() {
+		super();
+	}
+
+	public LocalTransportException(String message) {
+		super(message);
 	}
 
-	public LocalTransportException(String message, SocketAddress address, Throwable cause) {
-		super(message, address, cause);
+	public LocalTransportException(String message, Throwable cause) {
+		super(message, cause);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
index 5f81883..1a83a91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
@@ -18,17 +18,43 @@
 
 package org.apache.flink.runtime.io.network.netty.exception;
 
+import org.apache.flink.runtime.execution.CancelTaskException;
+
 import java.net.SocketAddress;
 
-public class RemoteTransportException extends TransportException {
+/**
+ * Exception thrown on remote transport failures.
+ *
+ * <p>If you get this type of exception at task manager T, it means that
+ * something went wrong at the network stack of another task manager (not T).
+ * It is not an issue at the task, which throws the Exception.
+ */
+public class RemoteTransportException extends CancelTaskException {
 
 	private static final long serialVersionUID = 4373615529545893089L;
 
-	public RemoteTransportException(String message, SocketAddress address) {
-		super(message, address);
+	/** Address of the remote task manager that caused this Exception. */
+	private final SocketAddress remoteAddress;
+
+	public RemoteTransportException() {
+		this(null, null, null);
+	}
+
+	public RemoteTransportException(String msg, SocketAddress remoteAddress) {
+		this(msg, null, remoteAddress);
+	}
+
+	public RemoteTransportException(String msg, Throwable cause, SocketAddress remoteAddress)
{
+		super(msg, cause);
+		this.remoteAddress = remoteAddress;
 	}
 
-	public RemoteTransportException(String message, SocketAddress address, Throwable cause)
{
-		super(message, address, cause);
+	/**
+	 * Returns the address of the task manager causing this Exception.
+	 *
+	 * @return Address of the remote task manager causing this Exception
+	 */
+	public SocketAddress getRemoteAddress() {
+		return remoteAddress;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
deleted file mode 100644
index 0438688..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty.exception;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-
-public abstract class TransportException extends IOException {
-
-	private static final long serialVersionUID = 3637820720589866570L;
-
-	private final SocketAddress address;
-
-	public TransportException(String message, SocketAddress address) {
-		this(message, address, null);
-	}
-
-	public TransportException(String message, SocketAddress address, Throwable cause) {
-		super(message, cause);
-
-		this.address = address;
-	}
-
-	public SocketAddress getAddress() {
-		return address;
-	}
-}


Mime
View raw message