flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7507] [dispatcher] Fence Dispatcher
Date Mon, 04 Sep 2017 06:49:02 GMT
Repository: flink
Updated Branches:
  refs/heads/master 64e8de97d -> 84c2a928c


[FLINK-7507] [dispatcher] Fence Dispatcher

Let the Dispatcher extend the FencedRpcEndpoint and introduce DispatcherId which
replaces the UUID as leader id/fencing token.

This closes #4584.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84c2a928
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84c2a928
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84c2a928

Branch: refs/heads/master
Commit: 84c2a928cc4c5ee612bf57a1b944362d9114c92c
Parents: 64e8de9
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Aug 24 19:16:10 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Sep 4 08:48:24 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 44 +++++----------
 .../runtime/dispatcher/DispatcherGateway.java   |  7 +--
 .../flink/runtime/dispatcher/DispatcherId.java  | 57 ++++++++++++++++++++
 .../runtime/dispatcher/DispatcherTest.java      |  2 +-
 4 files changed, 74 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e7e1ec2..00cbb2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -40,7 +39,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.util.ExceptionUtils;
@@ -60,7 +59,7 @@ import java.util.concurrent.CompletableFuture;
  * the jobs and to recover them in case of a master failure. Furthermore, it knows
  * about the state of the Flink session cluster.
  */
-public abstract class Dispatcher extends RpcEndpoint implements DispatcherGateway, LeaderContender
{
+public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
DispatcherGateway, LeaderContender {
 
 	public static final String DISPATCHER_NAME = "dispatcher";
 
@@ -80,8 +79,6 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 
 	private final LeaderElectionService leaderElectionService;
 
-	private volatile UUID leaderSessionId;
-
 	protected Dispatcher(
 			RpcService rpcService,
 			String endpointId,
@@ -91,7 +88,7 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
-		super(rpcService, endpointId);
+		super(rpcService, endpointId, DispatcherId.generate());
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
@@ -106,9 +103,6 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		jobManagerRunners = new HashMap<>(16);
 
 		leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
-
-		// we are not the leader when this object is created
-		leaderSessionId = null;
 	}
 
 	//------------------------------------------------------
@@ -156,13 +150,7 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 	//------------------------------------------------------
 
 	@Override
-	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, UUID leaderSessionId,
Time timeout) {
-
-		try {
-			validateLeaderSessionId(leaderSessionId);
-		} catch (LeaderIdMismatchException e) {
-			return FutureUtils.completedExceptionally(e);
-		}
+	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
 
 		final JobID jobId = jobGraph.getJobID();
 
@@ -274,8 +262,6 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 	private void recoverJobs() {
 		log.info("Recovering all persisted jobs.");
 
-		final UUID currentLeaderSessionId = leaderSessionId;
-
 		getRpcService().execute(
 			() -> {
 				final Collection<JobID> jobIds;
@@ -291,7 +277,7 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 					try {
 						SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
 
-						runAsync(() -> submitJob(submittedJobGraph.getJobGraph(), currentLeaderSessionId,
RpcUtils.INF_TIMEOUT));
+						runAsync(() -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT));
 					} catch (Exception e) {
 						log.error("Could not recover the job graph for " + jobId + '.', e);
 					}
@@ -304,12 +290,6 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		fatalErrorHandler.onFatalError(throwable);
 	}
 
-	private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException
{
-		if (this.leaderSessionId == null || !this.leaderSessionId.equals(leaderSessionID)) {
-			throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionID);
-		}
-	}
-
 	protected abstract JobManagerRunner createJobManagerRunner(
 		ResourceID resourceId,
 		JobGraph jobGraph,
@@ -333,16 +313,18 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		runAsync(
+		runAsyncWithoutFencing(
 			() -> {
-				log.info("Dispatcher {} was granted leadership with leader session ID {}", getAddress(),
newLeaderSessionID);
+				final DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID);
+
+				log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(),
dispatcherId);
 
 				// clear the state if we've been the leader before
-				if (leaderSessionId != null) {
+				if (getFencingToken() != null) {
 					clearState();
 				}
 
-				leaderSessionId = newLeaderSessionID;
+				setFencingToken(dispatcherId);
 
 				// confirming the leader session ID might be blocking,
 				getRpcService().execute(
@@ -357,10 +339,12 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 	 */
 	@Override
 	public void revokeLeadership() {
-		runAsync(
+		runAsyncWithoutFencing(
 			() -> {
 				log.info("Dispatcher {} was revoked leadership.", getAddress());
 				clearState();
+
+				setFencingToken(DispatcherId.generate());
 			});
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 669f616..09254c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,29 +22,26 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
 import java.util.Collection;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Gateway for the Dispatcher component.
  */
-public interface DispatcherGateway extends RpcGateway {
+public interface DispatcherGateway extends FencedRpcGateway<DispatcherId> {
 
 	/**
 	 * Submit a job to the dispatcher.
 	 *
 	 * @param jobGraph JobGraph to submit
-	 * @param leaderSessionId leader session id
 	 * @param timeout RPC timeout
 	 * @return A future acknowledge if the submission succeeded
 	 */
 	CompletableFuture<Acknowledge> submitJob(
 		JobGraph jobGraph,
-		UUID leaderSessionId,
 		@RpcTimeout Time timeout);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
new file mode 100644
index 0000000..e563090
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.util.AbstractID;
+
+import java.util.UUID;
+
+/**
+ * Fencing token of the {@link Dispatcher}.
+ */
+public class DispatcherId extends AbstractID {
+
+	private static final long serialVersionUID = -1654056277003743966L;
+
+	public DispatcherId(byte[] bytes) {
+		super(bytes);
+	}
+
+	public DispatcherId(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+	public DispatcherId(AbstractID id) {
+		super(id);
+	}
+
+	public DispatcherId() {}
+
+	public DispatcherId(UUID uuid) {
+		this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+	}
+
+	public UUID toUUID() {
+		return new UUID(getUpperPart(), getLowerPart());
+	}
+
+	public static DispatcherId generate() {
+		return new DispatcherId();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 091608c..8846686 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -113,7 +113,7 @@ public class DispatcherTest extends TestLogger {
 
 			DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
HighAvailabilityServices.DEFAULT_LEADER_ID, timeout);
+			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
timeout);
 
 			acknowledgeFuture.get();
 


Mime
View raw message