flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-4516] update leadership information in ResourceManager
Date Thu, 13 Oct 2016 14:19:10 GMT
Repository: flink
Updated Branches:
  refs/heads/flip-6 64ee13862 -> 35a44daa6


[FLINK-4516] update leadership information in ResourceManager

The leadership information remained static for connected
JobMasters. This updates it to remove stale JobMasters when they lose
leadership status.

This closes #2624


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35a44daa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35a44daa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35a44daa

Branch: refs/heads/flip-6
Commit: 35a44daa6c1ae4315e854352a0c020e43a834fc4
Parents: 64ee138
Author: Maximilian Michels <mxm@apache.org>
Authored: Mon Oct 10 17:36:10 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Thu Oct 13 16:14:25 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        | 196 +++++++++++++------
 .../resourcemanager/ResourceManagerGateway.java |   4 +-
 .../ResourceManagerServices.java                |   6 +
 .../registration/JobMasterRegistration.java     |  62 ++++++
 .../slotmanager/SlotManager.java                |  16 +-
 .../resourcemanager/TestingSlotManager.java     |   8 +
 .../slotmanager/SlotManagerTest.java            |  10 +-
 7 files changed, 224 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35a44daa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index d2d00cf..8fbb34b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -40,6 +41,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
+import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
@@ -53,17 +55,14 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.util.LeaderConnectionInfo;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -85,10 +84,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
 	/** All currently registered JobMasterGateways scoped by JobID. */
-	private final Map<JobID, JobMasterGateway> jobMasterGateways;
+	private final Map<JobID, JobMasterRegistration> jobMasters;
 
-	/** LeaderListeners for all registered JobMasters. */
-	private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners;
+	/** LeaderListeners for all registered JobIDs. */
+	private final Map<JobID, JobIdLeaderListener> leaderListeners;
 
 	/** All currently registered TaskExecutors with there framework specific worker information.
*/
 	private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
@@ -106,7 +105,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private LeaderElectionService leaderElectionService;
 
 	/** ResourceManager's leader session id which is updated on leader election. */
-	private UUID leaderSessionID;
+	private volatile UUID leaderSessionID;
 
 	/** All registered listeners for status updates of the ResourceManager. */
 	private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
@@ -121,8 +120,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.slotManagerFactory = checkNotNull(slotManagerFactory);
-		this.jobMasterGateways = new HashMap<>();
-		this.jobMasterLeaderRetrievalListeners = new HashMap<>();
+		this.jobMasters = new HashMap<>();
+		this.leaderListeners = new HashMap<>();
 		this.taskExecutors = new HashMap<>();
 		this.leaderSessionID = new UUID(0, 0);
 		infoMessageListeners = new HashMap<>();
@@ -149,9 +148,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	public void shutDown() {
 		try {
 			leaderElectionService.stop();
-			for (JobID jobID : jobMasterGateways.keySet()) {
-				highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop();
-			}
+			clearState();
 			super.shutDown();
 		} catch (Throwable e) {
 			log.error("A fatal error happened when shutdown the ResourceManager", e);
@@ -185,6 +182,24 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		checkNotNull(jobMasterAddress);
 		checkNotNull(jobID);
 
+		// create a leader retriever in case it doesn't exist
+		final JobIdLeaderListener jobIdLeaderListener;
+		if (leaderListeners.containsKey(jobID)) {
+			jobIdLeaderListener = leaderListeners.get(jobID);
+		} else {
+			try {
+				LeaderRetrievalService jobMasterLeaderRetriever =
+					highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
+				jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
+			} catch (Exception e) {
+				log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+				FlinkCompletableFuture<RegistrationResponse> responseFuture = new FlinkCompletableFuture<>();
+				responseFuture.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+				return responseFuture;
+			}
+			leaderListeners.put(jobID, jobIdLeaderListener);
+		}
+
 		return getRpcService()
 			.execute(new Callable<JobMasterGateway>() {
 				@Override
@@ -197,21 +212,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 						throw new Exception("Invalid leader session id");
 					}
 
-					final LeaderConnectionInfo jobMasterLeaderInfo;
-					try {
-						jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-							highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new FiniteDuration(5,
TimeUnit.SECONDS));
-					} catch (Exception e) {
-						log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
-						throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
-					}
-
-					if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
-						log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
-						throw new Exception("JobManager is not leading");
+					if (!jobIdLeaderListener.getLeaderID().get(timeout.getSize(), timeout.getUnit())
+							.equals(jobMasterLeaderId)) {
+						throw new Exception("Leader Id did not match");
 					}
 
-					return getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, TimeUnit.SECONDS);
+					return getRpcService().connect(jobMasterAddress, JobMasterGateway.class)
+						.get(timeout.getSize(), timeout.getUnit());
 				}
 			})
 			.handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>()
{
@@ -220,24 +227,34 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 					if (throwable != null) {
 						return new RegistrationResponse.Decline(throwable.getMessage());
-					} else {
-						if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
-							JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
-							try {
-								LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
-								jobMasterLeaderRetriever.start(jobMasterLeaderListener);
-							} catch (Exception e) {
-								log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
-								return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
-							}
-							jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
-						}
-						final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
-						if (existingGateway != null) {
-							log.info("Replacing gateway for registered JobID {}.", jobID);
+					}
+
+					if (!leaderSessionID.equals(resourceManagerLeaderId)) {
+						log.warn("Discard registration from JobMaster {} at ({}) because the expected leader
session ID {}" +
+								" did not equal the received leader session ID  {}",
+							jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
+						return new RegistrationResponse.Decline("Invalid leader session id");
+					}
+
+					try {
+						// LeaderID should be available now, but if not we fail the registration
+						UUID currentJobMasterLeaderId = jobIdLeaderListener.getLeaderID().getNow(null);
+						if (currentJobMasterLeaderId == null || !currentJobMasterLeaderId.equals(jobMasterLeaderId))
{
+							throw new Exception("Leader Id did not match");
 						}
-						return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
+					} catch (Exception e) {
+						return new RegistrationResponse.Decline(e.getMessage());
+					}
+
+					final JobMasterRegistration registration =
+						new JobMasterRegistration(jobID, jobMasterLeaderId, jobMasterGateway);
+
+					final JobMasterRegistration existingRegistration = jobMasters.put(jobID, registration);
+					if (existingRegistration != null) {
+						log.info("Replacing JobMaster registration for newly registered JobMaster with JobID
{}.", jobID);
 					}
+					return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
+
 				}
 			}, getMainThreadExecutor());
 	}
@@ -305,13 +322,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			SlotRequest slotRequest) {
 
 		JobID jobId = slotRequest.getJobId();
-		JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
-		JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId);
+		JobMasterRegistration jobMasterRegistration = jobMasters.get(jobId);
 
-		UUID leaderID = jobMasterLeaderListener.getLeaderID();
-
-		if (jobMasterGateway != null
-				&& jobMasterLeaderID.equals(leaderID)
+		if (jobMasterRegistration != null
+				&& jobMasterLeaderID.equals(jobMasterRegistration.getLeaderID())
 				&& resourceManagerLeaderID.equals(leaderSessionID)) {
 			return slotManager.requestSlot(slotRequest);
 		} else {
@@ -371,8 +385,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(),
leaderSessionID);
 				// confirming the leader session ID might be blocking,
 				leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-				// notify SlotManager
-				slotManager.setLeaderUUID(leaderSessionID);
 				ResourceManager.this.leaderSessionID = leaderSessionID;
 			}
 		});
@@ -387,10 +399,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			@Override
 			public void run() {
 				log.info("ResourceManager {} was revoked leadership.", getAddress());
-				jobMasterGateways.clear();
-				taskExecutors.clear();
-				slotManager.clearState();
-				leaderSessionID = new UUID(0, 0);
+				clearState();
 			}
 		});
 	}
@@ -577,6 +586,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private class DefaultResourceManagerServices implements ResourceManagerServices {
 
 		@Override
+		public UUID getLeaderID() {
+			return ResourceManager.this.leaderSessionID;
+		}
+
+		@Override
 		public void allocateResource(ResourceProfile resourceProfile) {
 			ResourceManager.this.startNewWorker(resourceProfile);
 		}
@@ -592,33 +606,95 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 	}
 
-	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
+	/**
+	 * Leader instantiated for each connected JobMaster
+	 */
+	private class JobIdLeaderListener implements LeaderRetrievalListener {
 
 		private final JobID jobID;
-		private UUID leaderID;
+		private final LeaderRetrievalService retrievalService;
 
-		private JobMasterLeaderListener(JobID jobID) {
+		private final FlinkCompletableFuture<UUID> initialLeaderIdFuture;
+
+		private volatile UUID leaderID;
+
+		private JobIdLeaderListener(
+				JobID jobID,
+				LeaderRetrievalService retrievalService) throws Exception {
 			this.jobID = jobID;
+			this.retrievalService = retrievalService;
+			this.initialLeaderIdFuture = new FlinkCompletableFuture<>();
+			this.retrievalService.start(this);
+		}
+
+		public Future<UUID> getLeaderID() {
+			if (!initialLeaderIdFuture.isDone()) {
+				return initialLeaderIdFuture;
+			} else {
+				return FlinkCompletableFuture.completed(leaderID);
+			}
 		}
 
 		public JobID getJobID() {
 			return jobID;
 		}
 
-		public UUID getLeaderID() {
-			return leaderID;
+
+		public void stopService() throws Exception {
+			retrievalService.stop();
 		}
 
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID)
{
 			this.leaderID = leaderSessionID;
+
+			if (!initialLeaderIdFuture.isDone()) {
+				initialLeaderIdFuture.complete(leaderSessionID);
+			}
+
+			ResourceManager.this.runAsync(new Runnable() {
+				@Override
+				public void run() {
+					JobMasterRegistration jobMasterRegistration = ResourceManager.this.jobMasters.get(jobID);
+					if (jobMasterRegistration == null || !jobMasterRegistration.getLeaderID().equals(leaderSessionID))
{
+						// registration is not valid anymore, remove registration
+						ResourceManager.this.jobMasters.remove(jobID);
+						// leader listener is not necessary anymore
+						JobIdLeaderListener listener = ResourceManager.this.leaderListeners.remove(jobID);
+						if (listener != null) {
+							try {
+								listener.stopService();
+							} catch (Exception e) {
+								ResourceManager.this.handleError(e);
+							}
+						}
+					}
+				}
+			});
 		}
 
 		@Override
 		public void handleError(final Exception exception) {
-			// TODO
+			ResourceManager.this.handleError(exception);
 		}
 	}
 
+	private void clearState() {
+		jobMasters.clear();
+		taskExecutors.clear();
+		slotManager.clearState();
+		Iterator<JobIdLeaderListener> leaderListenerIterator =
+			leaderListeners.values().iterator();
+		while (leaderListenerIterator.hasNext()) {
+			JobIdLeaderListener listener = leaderListenerIterator.next();
+			try {
+				listener.stopService();
+			} catch (Exception e) {
+				handleError(e);
+			}
+			leaderListenerIterator.remove();
+		}
+		leaderSessionID = new UUID(0, 0);
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35a44daa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 3c81227..07e9e43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -61,14 +61,14 @@ public interface ResourceManagerGateway extends RpcGateway {
 	/**
 	 * Requests a slot from the resource manager.
 	 *
-	 * @param jobMasterLeaderID leader id of the JobMaster
 	 * @param resourceManagerLeaderID leader if of the ResourceMaster
+	 * @param jobMasterLeaderID leader if of the JobMaster
 	 * @param slotRequest The slot to request
 	 * @return The confirmation that the slot gets allocated
 	 */
 	Future<RMSlotRequestReply> requestSlot(
-		UUID jobMasterLeaderID,
 		UUID resourceManagerLeaderID,
+		UUID jobMasterLeaderID,
 		SlotRequest slotRequest,
 		@RpcTimeout Time timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35a44daa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
index b997a3a..16d0a7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 /**
@@ -27,6 +28,11 @@ import java.util.concurrent.Executor;
 public interface ResourceManagerServices {
 
 	/**
+	 * Gets the current leader id assigned at the ResourceManager.
+	 */
+	UUID getLeaderID();
+
+	/**
 	 * Allocates a resource according to the resource profile.
 	 */
 	void allocateResource(ResourceProfile resourceProfile);

http://git-wip-us.apache.org/repos/asf/flink/blob/35a44daa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
new file mode 100644
index 0000000..f417935
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+
+import java.util.UUID;
+
+/**
+ * This class is responsible for grouping the JobMasterGateway and the JobMaster's
+ * leader id
+ */
+public class JobMasterRegistration {
+
+	private static final long serialVersionUID = -2062957799469434614L;
+
+	private final JobID jobID;
+
+	private final UUID leaderID;
+
+	private final JobMasterGateway jobMasterGateway;
+
+	public JobMasterRegistration(
+			JobID jobID,
+			UUID leaderID,
+			JobMasterGateway jobMasterGateway) {
+		this.jobID = jobID;
+		this.leaderID = leaderID;
+		this.jobMasterGateway = jobMasterGateway;
+	}
+
+	public JobID getJobID() {
+		return jobID;
+	}
+
+
+	public UUID getLeaderID() {
+		return leaderID;
+	}
+
+	public JobMasterGateway getJobMasterGateway() {
+		return jobMasterGateway;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35a44daa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 7eb2d78..e312ea2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -85,9 +84,6 @@ public abstract class SlotManager {
 
 	private final Time timeout;
 
-	/** The current leader id set by the ResourceManager */
-	private UUID leaderID;
-
 	public SlotManager(ResourceManagerServices rmServices) {
 		this.rmServices = checkNotNull(rmServices);
 		this.registeredSlots = new HashMap<>(16);
@@ -96,7 +92,6 @@ public abstract class SlotManager {
 		this.allocationMap = new AllocationMap();
 		this.taskManagers = new HashMap<>();
 		this.timeout = Time.seconds(10);
-		this.leaderID = new UUID(0, 0);
 	}
 
 	// ------------------------------------------------------------------------
@@ -303,7 +298,7 @@ public abstract class SlotManager {
 		final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration();
 		final Future<TMSlotRequestReply> slotRequestReplyFuture =
 			registration.getTaskExecutorGateway()
-				.requestSlot(freeSlot.getSlotId(), allocationID, leaderID, timeout);
+				.requestSlot(freeSlot.getSlotId(), allocationID, rmServices.getLeaderID(), timeout);
 
 		slotRequestReplyFuture.handleAsync(new BiFunction<TMSlotRequestReply, Throwable, Void>()
{
 			@Override
@@ -488,15 +483,6 @@ public abstract class SlotManager {
 		pendingSlotRequests.clear();
 		freeSlots.clear();
 		allocationMap.clear();
-		leaderID = new UUID(0, 0);
-	}
-
-	// ------------------------------------------------------------------------
-	//  High availability (called by the ResourceManager)
-	// ------------------------------------------------------------------------
-
-	public void setLeaderUUID(UUID leaderSessionID) {
-		this.leaderID = leaderSessionID;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/35a44daa/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
index 0b2c42b..67b208d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
@@ -26,6 +26,7 @@ import org.mockito.Mockito;
 
 import java.util.Iterator;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 public class TestingSlotManager extends SlotManager {
@@ -60,6 +61,13 @@ public class TestingSlotManager extends SlotManager {
 
 	private static class TestingResourceManagerServices implements ResourceManagerServices {
 
+		private final UUID leaderID = UUID.randomUUID();
+
+		@Override
+		public UUID getLeaderID() {
+			return leaderID;
+		}
+
 		@Override
 		public void allocateResource(ResourceProfile resourceProfile) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35a44daa/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 0d2b40d..558d3c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -498,13 +498,21 @@ public class SlotManagerTest {
 
 		private static class TestingRmServices implements ResourceManagerServices {
 
-			private List<ResourceProfile> allocatedContainers;
+			private final UUID leaderID;
+
+			private final List<ResourceProfile> allocatedContainers;
 
 			public TestingRmServices() {
+				this.leaderID = UUID.randomUUID();
 				this.allocatedContainers = new LinkedList<>();
 			}
 
 			@Override
+			public UUID getLeaderID() {
+				return leaderID;
+			}
+
+			@Override
 			public void allocateResource(ResourceProfile resourceProfile) {
 				allocatedContainers.add(resourceProfile);
 			}


Mime
View raw message