flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7655] [flip6] Set fencing token to null if not leader
Date Thu, 21 Sep 2017 22:59:34 GMT
Repository: flink
Updated Branches:
  refs/heads/master a86b64686 -> 42cc3a2a9


[FLINK-7655] [flip6] Set fencing token to null if not leader

This commit changes the fencing behaviour such that a component which is not the
leader will set its fencing token to null. This distinction allows to throw different
exceptions depending on whether it is a token mismatch or whether the receiver has
no fencing token set (== not being the leader).

This closes #4689.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42cc3a2a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42cc3a2a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42cc3a2a

Branch: refs/heads/master
Commit: 42cc3a2a9c41dda7cf338db36b45131db9150674
Parents: a86b646
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Sep 20 17:39:35 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Sep 22 00:59:07 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  5 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 10 +--
 .../flink/runtime/jobmaster/JobMasterId.java    |  2 -
 .../resourcemanager/ResourceManager.java        |  8 +--
 .../flink/runtime/rpc/FencedRpcEndpoint.java    | 16 +++--
 .../runtime/rpc/akka/FencedAkkaRpcActor.java    | 37 ++++++----
 .../rpc/exceptions/FencingTokenException.java   | 42 +++++++++++
 .../FencingTokenMismatchException.java          | 42 -----------
 .../rpc/messages/LocalFencedMessage.java        |  6 +-
 .../rpc/messages/RemoteFencedMessage.java       |  6 +-
 .../resourcemanager/ResourceManagerHATest.java  |  2 +-
 .../ResourceManagerJobMasterTest.java           | 20 +++++-
 .../ResourceManagerTaskExecutorTest.java        |  4 +-
 .../flink/runtime/rpc/AsyncCallsTest.java       | 15 +++-
 .../runtime/rpc/FencedRpcEndpointTest.java      | 75 ++++++++++++--------
 15 files changed, 171 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 6b9999c..153ee53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -94,7 +94,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
 			Optional<String> restAddress) throws Exception {
-		super(rpcService, endpointId, DispatcherId.generate());
+		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
@@ -399,7 +399,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 					log.warn("Could not properly clear the Dispatcher state while revoking leadership.",
e);
 				}
 
-				setFencingToken(DispatcherId.generate());
+				// clear the fencing token indicating that we don't have the leadership right now
+				setFencingToken(null);
 			});
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 2bfe277..343fbf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -212,7 +212,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements
JobMast
 			FatalErrorHandler errorHandler,
 			ClassLoader userCodeLoader) throws Exception {
 
-		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME), JobMasterId.INITIAL_JOB_MASTER_ID);
+		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
 
 		selfGateway = getSelfGateway(JobMasterGateway.class);
 
@@ -735,7 +735,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements
JobMast
 			return Acknowledge.get();
 		}
 
-		if (!Objects.equals(getFencingToken(), JobMasterId.INITIAL_JOB_MASTER_ID)) {
+		if (getFencingToken() != null) {
 			log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", getFencingToken(),
newJobMasterId);
 
 			// first we have to suspend the current execution
@@ -791,13 +791,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
implements JobMast
 	private Acknowledge suspendExecution(final Throwable cause) {
 		validateRunsInMainThread();
 
-		if (Objects.equals(JobMasterId.INITIAL_JOB_MASTER_ID, getFencingToken())) {
+		if (getFencingToken() == null) {
 			log.debug("Job has already been suspended or shutdown.");
 			return Acknowledge.get();
 		}
 
-		// not leader anymore --> set the JobMasterId to the initial id
-		setFencingToken(JobMasterId.INITIAL_JOB_MASTER_ID);
+		// not leader anymore --> set the JobMasterId to null
+		setFencingToken(null);
 
 		try {
 			resourceManagerLeaderRetriever.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
index ffd53b3..39f7ded 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
@@ -29,8 +29,6 @@ public class JobMasterId extends AbstractID {
 
 	private static final long serialVersionUID = -933276753644003754L;
 
-	public static final JobMasterId INITIAL_JOB_MASTER_ID = new JobMasterId(0L, 0L);
-
 	public JobMasterId(byte[] bytes) {
 		super(bytes);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 87cf7d1..f69998c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -139,7 +139,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 
-		super(rpcService, resourceManagerEndpointId, ResourceManagerId.generate());
+		super(rpcService, resourceManagerEndpointId);
 
 		this.resourceId = checkNotNull(resourceId);
 		this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
@@ -772,13 +772,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	public void revokeLeadership() {
 		runAsyncWithoutFencing(
 			() -> {
-				final ResourceManagerId newResourceManagerId = ResourceManagerId.generate();
-
-				log.info("ResourceManager {} was revoked leadership. Setting fencing token to {}.", getAddress(),
newResourceManagerId);
+				log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress());
 
 				clearState();
 
-				setFencingToken(newResourceManagerId);
+				setFencingToken(null);
 
 				slotManager.suspend();
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
index 81bae29..ff74f47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
@@ -19,7 +19,8 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.UUID;
@@ -39,25 +40,26 @@ public class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint
{
 	private volatile F fencingToken;
 	private volatile MainThreadExecutor fencedMainThreadExecutor;
 
-	protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F initialFencingToken)
{
+	protected FencedRpcEndpoint(RpcService rpcService, String endpointId) {
 		super(rpcService, endpointId);
 
-		this.fencingToken = Preconditions.checkNotNull(initialFencingToken);
+		// no fencing token == no leadership
+		this.fencingToken = null;
 		this.fencedMainThreadExecutor = new MainThreadExecutor(
 			getRpcService().fenceRpcServer(
 				rpcServer,
-				initialFencingToken));
+				null));
 	}
 
-	protected FencedRpcEndpoint(RpcService rpcService, F initialFencingToken) {
-		this(rpcService, UUID.randomUUID().toString(), initialFencingToken);
+	protected FencedRpcEndpoint(RpcService rpcService) {
+		this(rpcService, UUID.randomUUID().toString());
 	}
 
 	public F getFencingToken() {
 		return fencingToken;
 	}
 
-	protected void setFencingToken(F newFencingToken) {
+	protected void setFencingToken(@Nullable F newFencingToken) {
 		// this method should only be called from within the main thread
 		validateRunsInMainThread();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 1ace3b7..369af6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.messages.FencedMessage;
 import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
@@ -45,23 +45,36 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
 	@Override
 	protected void handleMessage(Object message) {
 		if (message instanceof FencedMessage) {
-			@SuppressWarnings("unchecked")
-			FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message);
 
-			F fencingToken = fencedMessage.getFencingToken();
+			final F expectedFencingToken = rpcEndpoint.getFencingToken();
 
-			if (Objects.equals(rpcEndpoint.getFencingToken(), fencingToken)) {
-				super.handleMessage(fencedMessage.getPayload());
-			} else {
+			if (expectedFencingToken == null) {
 				if (log.isDebugEnabled()) {
-					log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {}
did " +
-						"not match the expected fencing token {}.", message, fencingToken, rpcEndpoint.getFencingToken());
+					log.debug("Fencing token not set: Ignoring message {} because the fencing token is null.",
message);
 				}
 
 				sendErrorIfSender(
-					new FencingTokenMismatchException("Fencing token mismatch: Ignoring message " + message
+
-						" because the fencing token " + fencingToken + " did not match the expected fencing
token " +
-						rpcEndpoint.getFencingToken() + '.'));
+					new FencingTokenException(
+						"Fencing token not set: Ignoring message " + message + " because the fencing token
is null."));
+			} else {
+				@SuppressWarnings("unchecked")
+				FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message);
+
+				F fencingToken = fencedMessage.getFencingToken();
+
+				if (Objects.equals(expectedFencingToken, fencingToken)) {
+					super.handleMessage(fencedMessage.getPayload());
+				} else {
+					if (log.isDebugEnabled()) {
+						log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {}
did " +
+							"not match the expected fencing token {}.", message, fencingToken, expectedFencingToken);
+					}
+
+					sendErrorIfSender(
+						new FencingTokenException("Fencing token mismatch: Ignoring message " + message +
+							" because the fencing token " + fencingToken + " did not match the expected fencing
token " +
+							expectedFencingToken + '.'));
+				}
 			}
 		} else if (message instanceof UnfencedMessage) {
 			super.handleMessage(((UnfencedMessage<?>) message).getPayload());

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java
new file mode 100644
index 0000000..71520c8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
+
+/**
+ * Exception which is thrown if the fencing tokens of a {@link FencedRpcEndpoint} do
+ * not match.
+ */
+public class FencingTokenException extends RpcException {
+	private static final long serialVersionUID = -500634972988881467L;
+
+	public FencingTokenException(String message) {
+		super(message);
+	}
+
+	public FencingTokenException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public FencingTokenException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
deleted file mode 100644
index 9a59101..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.exceptions;
-
-import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
-import org.apache.flink.runtime.rpc.exceptions.RpcException;
-
-/**
- * Exception which is thrown if the fencing tokens of a {@link FencedRpcEndpoint} do
- * not match.
- */
-public class FencingTokenMismatchException extends RpcException {
-	private static final long serialVersionUID = -500634972988881467L;
-
-	public FencingTokenMismatchException(String message) {
-		super(message);
-	}
-
-	public FencingTokenMismatchException(String message, Throwable cause) {
-		super(message, cause);
-	}
-
-	public FencingTokenMismatchException(Throwable cause) {
-		super(cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
index 2481065..0ee4940 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rpc.messages;
 
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /**
@@ -34,8 +36,8 @@ public class LocalFencedMessage<F extends Serializable, P> implements
FencedMess
 	private final F fencingToken;
 	private final P payload;
 
-	public LocalFencedMessage(F fencingToken, P payload) {
-		this.fencingToken = Preconditions.checkNotNull(fencingToken);
+	public LocalFencedMessage(@Nullable F fencingToken, P payload) {
+		this.fencingToken = fencingToken;
 		this.payload = Preconditions.checkNotNull(payload);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
index 5cf9b98..ad8c349 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rpc.messages;
 
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /**
@@ -35,8 +37,8 @@ public class RemoteFencedMessage<F extends Serializable, P extends Serializable>
 	private final F fencingToken;
 	private final P payload;
 
-	public RemoteFencedMessage(F fencingToken, P payload) {
-		this.fencingToken = Preconditions.checkNotNull(fencingToken);
+	public RemoteFencedMessage(@Nullable F fencingToken, P payload) {
+		this.fencingToken = fencingToken;
 		this.payload = Preconditions.checkNotNull(payload);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 2b8792b..d0dd973 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -108,7 +108,7 @@ public class ResourceManagerHATest extends TestLogger {
 		try {
 			resourceManager.start();
 
-			Assert.assertNotNull(resourceManager.getFencingToken());
+			Assert.assertNull(resourceManager.getFencingToken());
 			final UUID leaderId = UUID.randomUUID();
 			leaderElectionService.isLeader(leaderId);
 			// after grant leadership, resourceManager's leaderId has value

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 156bc73..73c5b5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
@@ -47,6 +47,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -81,10 +82,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		JobMasterId jobMasterId = JobMasterId.generate();
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress,
jobMasterId.toUUID());
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class),
jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService,
jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 
+		// wait until the leader election has been completed
+		resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
 		// test response successful
 		CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager(
 			jobMasterId,
@@ -127,7 +132,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
 			fail("Should fail because we are using the wrong fencing token.");
 		} catch (ExecutionException e) {
-			assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
+			assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
 		}
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -151,6 +156,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
+		// wait until the leader election has been completed
+		resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
 		// test throw exception when receive a registration from job master which takes unmatched
leaderSessionId
 		JobMasterId differentJobMasterId = JobMasterId.generate();
 		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager(
@@ -182,6 +190,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
+		// wait until the leader election has been completed
+		resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
 		// test throw exception when receive a registration from job master which takes invalid
address
 		String invalidAddress = "/jobMasterAddress2";
 		CompletableFuture<RegistrationResponse> invalidAddressFuture = rmGateway.registerJobManager(
@@ -219,6 +230,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 
 		JobID unknownJobIDToHAServices = new JobID();
 
+		// wait until the leader election has been completed
+		resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
 		// this should fail because we try to register a job leader listener for an unknown job
id
 		CompletableFuture<RegistrationResponse> registrationFuture = rmGateway.registerJobManager(
 			new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID),

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 8add168..0206ade 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
@@ -134,7 +134,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 				fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
 			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
 			}
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index f8eca16..9fe9904 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -231,7 +231,7 @@ public class AsyncCallsTest extends TestLogger {
 
 			fail("The async call operation should fail due to the changed fencing token.");
 		} catch (ExecutionException e) {
-			assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
+			assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
 		}
 	}
 
@@ -346,10 +346,19 @@ public class AsyncCallsTest extends TestLogger {
 				UUID initialFencingToken,
 				OneShotLatch enteringSetNewFencingToken,
 				OneShotLatch triggerSetNewFencingToken) {
-			super(rpcService, initialFencingToken);
+			super(rpcService);
 
 			this.enteringSetNewFencingToken = enteringSetNewFencingToken;
 			this.triggerSetNewFencingToken = triggerSetNewFencingToken;
+
+			// make it look as if we are running in the main thread
+			currentMainThread.set(Thread.currentThread());
+
+			try {
+				setFencingToken(initialFencingToken);
+			} finally {
+				currentMainThread.set(null);
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index 62d5354..6162a2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.rpc.exceptions.RpcException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -67,17 +68,15 @@ public class FencedRpcEndpointTest extends TestLogger {
 	 */
 	@Test
 	public void testFencingTokenSetting() throws Exception {
-		final UUID initialFencingToken = UUID.randomUUID();
 		final String value = "foobar";
-		FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken,
value);
-		FencedTestingGateway fencedTestingGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
+		FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value);
 		FencedTestingGateway fencedGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
 
 		try {
 			fencedTestingEndpoint.start();
 
-			assertEquals(initialFencingToken, fencedGateway.getFencingToken());
-			assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken());
+			assertNull(fencedGateway.getFencingToken());
+			assertNull(fencedTestingEndpoint.getFencingToken());
 
 			final UUID newFencingToken = UUID.randomUUID();
 
@@ -88,9 +87,9 @@ public class FencedRpcEndpointTest extends TestLogger {
 				// expected to fail
 			}
 
-			assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken());
+			assertNull(fencedTestingEndpoint.getFencingToken());
 
-			CompletableFuture<Acknowledge> setFencingFuture = fencedTestingGateway.rpcSetFencingToken(newFencingToken,
timeout);
+			CompletableFuture<Acknowledge> setFencingFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken,
timeout);
 
 			// wait for the completion of the set fencing token operation
 			setFencingFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
@@ -109,15 +108,15 @@ public class FencedRpcEndpointTest extends TestLogger {
 	 */
 	@Test
 	public void testFencing() throws Exception {
-		final UUID initialFencingToken = UUID.randomUUID();
+		final UUID fencingToken = UUID.randomUUID();
 		final UUID wrongFencingToken = UUID.randomUUID();
 		final String value = "barfoo";
-		FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken,
value);
+		FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value,
fencingToken);
 
 		try {
 			fencedTestingEndpoint.start();
 
-			final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(),
initialFencingToken, FencedTestingGateway.class)
+			final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(),
fencingToken, FencedTestingGateway.class)
 				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			final FencedTestingGateway wronglyFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(),
wrongFencingToken, FencedTestingGateway.class)
 				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
@@ -128,12 +127,12 @@ public class FencedRpcEndpointTest extends TestLogger {
 				wronglyFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 				fail("This should fail since we have the wrong fencing token.");
 			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
 			}
 
 			final UUID newFencingToken = UUID.randomUUID();
 
-			CompletableFuture<Acknowledge> newFencingTokenFuture = properFencedGateway.rpcSetFencingToken(newFencingToken,
timeout);
+			CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken,
timeout);
 
 			// wait for the new fencing token to be set
 			newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
@@ -144,7 +143,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 
 				fail("This should fail since we have the wrong fencing token by now.");
 			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
 			}
 
 		} finally {
@@ -163,7 +162,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 		final UUID newFencingToken = UUID.randomUUID();
 		final String value = "foobar";
 
-		final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService,
initialFencingToken, value);
+		final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService,
value, initialFencingToken);
 
 		try {
 			fencedTestingEndpoint.start();
@@ -178,7 +177,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 			assertEquals(value, selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
 			assertEquals(value, remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
 
-			CompletableFuture<Acknowledge> newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken,
timeout);
+			CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken,
timeout);
 
 			// wait for the new fencing token to be set
 			newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
@@ -192,7 +191,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 				remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 				fail("This should have failed because we don't have the right fencing token.");
 			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
 			}
 		} finally {
 			fencedTestingEndpoint.shutDown();
@@ -208,7 +207,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 		final Time shortTimeout = Time.milliseconds(100L);
 		final UUID initialFencingToken = UUID.randomUUID();
 		final String value = "foobar";
-		final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService,
initialFencingToken, value);
+		final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService,
value, initialFencingToken);
 
 		try {
 			fencedTestingEndpoint.start();
@@ -221,7 +220,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 			// therefore, we know that the change fencing token call is executed after the trigger
MainThreadExecutor
 			// computation
 			final UUID newFencingToken = UUID.randomUUID();
-			CompletableFuture<Acknowledge> newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken,
timeout);
+			CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken,
timeout);
 
 			newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
@@ -253,7 +252,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 		final UUID initialFencingToken = UUID.randomUUID();
 		final String value = "foobar";
 
-		final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService,
initialFencingToken, value);
+		final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService,
value, initialFencingToken);
 
 		try {
 			fencedTestingEndpoint.start();
@@ -283,8 +282,6 @@ public class FencedRpcEndpointTest extends TestLogger {
 	public interface FencedTestingGateway extends FencedRpcGateway<UUID> {
 		CompletableFuture<String> foobar(@RpcTimeout Time timeout);
 
-		CompletableFuture<Acknowledge> rpcSetFencingToken(UUID fencingToken, @RpcTimeout
Time timeout);
-
 		CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(@RpcTimeout Time
timeout);
 
 		CompletableFuture<Acknowledge> triggerComputationLatch(@RpcTimeout Time timeout);
@@ -296,12 +293,25 @@ public class FencedRpcEndpointTest extends TestLogger {
 
 		private final String value;
 
-		protected FencedTestingEndpoint(RpcService rpcService, UUID initialFencingToken, String
value) {
-			super(rpcService, initialFencingToken);
+		protected FencedTestingEndpoint(RpcService rpcService, String value) {
+			this(rpcService, value, null);
+		}
+
+		protected FencedTestingEndpoint(RpcService rpcService, String value, UUID initialFencingToken)
{
+			super(rpcService);
 
 			computationLatch = new OneShotLatch();
 
 			this.value = value;
+
+			// make sure that it looks as if we are running in the main thread
+			currentMainThread.set(Thread.currentThread());
+
+			try {
+				setFencingToken(initialFencingToken);
+			} finally {
+				currentMainThread.set(null);
+			}
 		}
 
 		@Override
@@ -310,13 +320,6 @@ public class FencedRpcEndpointTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<Acknowledge> rpcSetFencingToken(UUID fencingToken, Time
timeout) {
-			setFencingToken(fencingToken);
-
-			return CompletableFuture.completedFuture(Acknowledge.get());
-		}
-
-		@Override
 		public CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(Time timeout)
{
 			return CompletableFuture.supplyAsync(
 				() -> {
@@ -340,5 +343,15 @@ public class FencedRpcEndpointTest extends TestLogger {
 
 			return CompletableFuture.completedFuture(Acknowledge.get());
 		}
+
+		public CompletableFuture<Acknowledge> setFencingTokenInMainThread(UUID fencingToken,
Time timeout) {
+			return callAsyncWithoutFencing(
+				() -> {
+					setFencingToken(fencingToken);
+
+					return Acknowledge.get();
+				},
+				timeout);
+		}
 	}
 }


Mime
View raw message