flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7651] [flip-6] Delay RetryingRegistration in case of connection error
Date Thu, 21 Sep 2017 07:32:45 GMT
Repository: flink
Updated Branches:
  refs/heads/master c6243b8b1 -> 9b0ba7ba3


[FLINK-7651] [flip-6] Delay RetryingRegistration in case of connection error

Similar to a registration error we should also delay the retrying registration in case of
connection error which could happen if the remote endpoint has not been started yet.

This closes #4686.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b0ba7ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b0ba7ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b0ba7ba

Branch: refs/heads/master
Commit: 9b0ba7ba3fdcf63c8ba21545772fbb950383026e
Parents: c6243b8
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Sep 20 12:04:58 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Sep 21 09:32:23 2017 +0200

----------------------------------------------------------------------
 .../registration/RetryingRegistration.java      | 24 ++++++++++++++++----
 .../registration/RetryingRegistrationTest.java  | 23 ++++++++++++++++---
 2 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b0ba7ba/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index ce4a798..378ac6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -151,6 +151,7 @@ public abstract class RetryingRegistration<F extends Serializable,
G extends Rpc
 	 * Cancels the registration procedure.
 	 */
 	public void cancel() {
+		completionFuture.cancel(false);
 		canceled = true;
 	}
 
@@ -175,6 +176,11 @@ public abstract class RetryingRegistration<F extends Serializable,
G extends Rpc
 	 */
 	@SuppressWarnings("unchecked")
 	public void startRegistration() {
+		if (canceled) {
+			// we already got canceled
+			return;
+		}
+
 		try {
 			// trigger resolution of the resource manager address to a callable gateway
 			final CompletableFuture<G> resourceManagerFuture;
@@ -199,16 +205,17 @@ public abstract class RetryingRegistration<F extends Serializable,
G extends Rpc
 			// upon failure, retry, unless this is cancelled
 			resourceManagerAcceptFuture.whenCompleteAsync(
 				(Void v, Throwable failure) -> {
-					if (failure != null && !isCanceled()) {
-						log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress,
failure);
-						startRegistration();
+					if (failure != null && !canceled) {
+						log.warn("Could not resolve {} address {}, retrying in {} ms", targetName, targetAddress,
delayOnError, failure);
+
+						startRegistrationLater(delayOnError);
 					}
 				},
 				rpcService.getExecutor());
 		}
 		catch (Throwable t) {
-			cancel();
 			completionFuture.completeExceptionally(t);
+			cancel();
 		}
 	}
 
@@ -280,8 +287,8 @@ public abstract class RetryingRegistration<F extends Serializable,
G extends Rpc
 				rpcService.getExecutor());
 		}
 		catch (Throwable t) {
-			cancel();
 			completionFuture.completeExceptionally(t);
+			cancel();
 		}
 	}
 
@@ -293,4 +300,11 @@ public abstract class RetryingRegistration<F extends Serializable,
G extends Rpc
 			}
 		}, delay, TimeUnit.MILLISECONDS);
 	}
+
+	private void startRegistrationLater(final long delay) {
+		rpcService.scheduleRunnable(
+			this::startRegistration,
+			delay,
+			TimeUnit.MILLISECONDS);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b0ba7ba/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index ac0dbc5..7fc6897 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -22,16 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -115,7 +116,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		final String testId = "laissez les bon temps roulez";
 		final UUID leaderId = UUID.randomUUID();
 
-		ExecutorService executor = Executors.newCachedThreadPool();
+		ExecutorService executor = TestingUtils.defaultExecutor();
 		TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
 
 		try {
@@ -126,20 +127,36 @@ public class RetryingRegistrationTest extends TestLogger {
 					CompletableFuture.completedFuture(testGateway)                         // second connection
attempt succeeds
 			);
 			when(rpc.getExecutor()).thenReturn(executor);
+			when(rpc.scheduleRunnable(any(Runnable.class), anyLong(), any(TimeUnit.class))).thenAnswer(
+				(InvocationOnMock invocation) -> {
+					final Runnable runnable = invocation.getArgumentAt(0, Runnable.class);
+					final long delay = invocation.getArgumentAt(1, Long.class);
+					final TimeUnit timeUnit = invocation.getArgumentAt(2, TimeUnit.class);
+					return TestingUtils.defaultScheduledExecutor().schedule(runnable, delay, timeUnit);
+				});
 
 			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address",
leaderId);
+
+			long start = System.currentTimeMillis();
+
 			registration.startRegistration();
 
 			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
 				registration.getFuture().get(10L, TimeUnit.SECONDS);
 
+			// measure the duration of the registration --> should be longer than the error delay
+			long duration = System.currentTimeMillis() - start;
+
+			assertTrue(
+				"The registration should have failed the first time. Thus the duration should be longer
than at least a single error delay.",
+				duration > TestRetryingRegistration.DELAY_ON_ERROR);
+
 			// validate correct invocation and result
 			assertEquals(testId, success.f1.getCorrelationId());
 			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
 		}
 		finally {
 			testGateway.stop();
-			executor.shutdown();
 		}
 	}
 


Mime
View raw message