flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7898] [flip6] Don't propagate CancellationExceptions in RegisteredRpcConnection
Date Mon, 23 Oct 2017 06:21:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 94f1d2b16 -> 14484db38


[FLINK-7898] [flip6] Don't propagate CancellationExceptions in RegisteredRpcConnection

The RegisteredRpcConnection should not call the exception handler when the RetryingRegistration
future
failed with a cancellation exception, because this is how the future is completed in case
of stopping
the RetryingRegistration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14484db3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14484db3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14484db3

Branch: refs/heads/master
Commit: 14484db3866df82537eee0f707d813bbf398bfd8
Parents: 94f1d2b
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Sun Oct 22 13:38:50 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Oct 23 08:21:05 2017 +0200

----------------------------------------------------------------------
 .../runtime/registration/RegisteredRpcConnection.java    | 11 +++++++++--
 .../flink/runtime/registration/RetryingRegistration.java |  2 +-
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14484db3/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index a585f0d..c76bcf8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.slf4j.Logger;
 
 import java.io.Serializable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -91,9 +92,15 @@ public abstract class RegisteredRpcConnection<F extends Serializable,
G extends
 
 		future.whenCompleteAsync(
 			(Tuple2<G, S> result, Throwable failure) -> {
-				// this future should only ever fail if there is a bug, not if the registration is declined
 				if (failure != null) {
-					onRegistrationFailure(failure);
+					if (failure instanceof CancellationException) {
+						// we ignore cancellation exceptions because they originate from cancelling
+						// the RetryingRegistration
+						log.debug("Retrying registration towards {} was cancelled.", targetAddress);
+					} else {
+						// this future should only ever fail if there is a bug, not if the registration is
declined
+						onRegistrationFailure(failure);
+					}
 				} else {
 					targetGateway = result.f0;
 					onRegistrationSuccess(result.f1);

http://git-wip-us.apache.org/repos/asf/flink/blob/14484db3/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 378ac6a..802d361 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -151,8 +151,8 @@ public abstract class RetryingRegistration<F extends Serializable,
G extends Rpc
 	 * Cancels the registration procedure.
 	 */
 	public void cancel() {
-		completionFuture.cancel(false);
 		canceled = true;
+		completionFuture.cancel(false);
 	}
 
 	/**


Mime
View raw message