flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-5971] [flip-6] Add timeout for registered jobs on the ResourceManager
Date Mon, 13 Mar 2017 14:03:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master 04aee61d8 -> fcd264a70


[FLINK-5971] [flip-6] Add timeout for registered jobs on the ResourceManager

This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive
if there is no active leader known for this job. In case that a job times out, it will
be removed from the ResourceManager. Additionally, this PR removes the dependency of
the JobLeaderIdService on the RunningJobsRegistry.

Fix YarnFlinkApplicationMasterRunner to use correct arguments for JobLeaderIdService

Fix race condition in JobLeaderIdListener#cancelTimeout

This closes #3488.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcd264a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcd264a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcd264a7

Branch: refs/heads/master
Commit: fcd264a707d3dd8ef4247825752c8639732c943c
Parents: 04aee61
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Mar 6 16:57:43 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Mar 13 15:03:18 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/AkkaOptions.java |   7 +
 .../configuration/ResourceManagerOptions.java   |  40 +++
 .../resourcemanager/JobLeaderIdActions.java     |   8 +-
 .../resourcemanager/JobLeaderIdService.java     | 119 +++++---
 .../resourcemanager/ResourceManager.java        |   6 +-
 .../ResourceManagerConfiguration.java           |  48 ++--
 .../resourcemanager/ResourceManagerRunner.java  |   5 +-
 .../resourcemanager/JobLeaderIdServiceTest.java | 269 +++++++++++++++++++
 .../resourcemanager/ResourceManagerHATest.java  |  10 +-
 .../ResourceManagerJobMasterTest.java           |  10 +-
 .../ResourceManagerTaskExecutorTest.java        |  10 +-
 .../slotmanager/SlotProtocolTest.java           |  21 +-
 .../taskexecutor/TaskExecutorITCase.java        |  10 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   5 +-
 14 files changed, 498 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 7e4c2b7..97b209e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -29,6 +29,13 @@ import org.apache.flink.annotation.PublicEvolving;
 public class AkkaOptions {
 
 	/**
+	 * Timeout for akka ask calls
+	 */
+	public static final ConfigOption<String> AKKA_ASK_TIMEOUT = ConfigOptions
+		.key("akka.ask.timeout")
+		.defaultValue("10 s");
+
+	/**
 	 * The Akka tcp connection timeout.
 	 */
 	public static final ConfigOption<String> AKKA_TCP_TIMEOUT = ConfigOptions

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
new file mode 100644
index 0000000..6a09f19
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The set of configuration options relating to the ResourceManager
+ */
+@PublicEvolving
+public class ResourceManagerOptions {
+
+	/**
+	 * Timeout for jobs which don't have a job manager as leader assigned.
+	 */
+	public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions
+		.key("resourcemanager.job.timeout")
+		.defaultValue("5 minutes");
+
+	// ---------------------------------------------------------------------------------------------
+
+	/** Not intended to be instantiated */
+	private ResourceManagerOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
index 58777ef..4ca6209 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
@@ -36,11 +36,13 @@ public interface JobLeaderIdActions {
 	void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId);
 
 	/**
-	 * Request to remove the job from the {@link JobLeaderIdService}.
+	 * Notify a job timeout. The job is identified by the given JobID. In order to check
+	 * for the validity of the timeout the timeout id of the triggered timeout is provided.
 	 *
-	 * @param jobId identifying the job to remove
+	 * @param jobId JobID which identifies the timed out job
+	 * @param timeoutId Id of the calling timeout to differentiate valid from invalid timeouts
 	 */
-	void removeJob(JobID jobId);
+	void notifyJobTimeout(JobID jobId, UUID timeoutId);
 
 	/**
 	 * Callback to report occurring errors.

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 7ef39de..8bffcd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.ExceptionUtils;
@@ -32,11 +32,14 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Service which retrieves for a registered job the current job leader id (the leader id
of the
@@ -51,8 +54,9 @@ public class JobLeaderIdService {
 	/** High availability services to use by this service */
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	/** Registry to retrieve running jobs */
-	private final RunningJobsRegistry runningJobsRegistry;
+	private final ScheduledExecutor scheduledExecutor;
+
+	private final Time jobTimeout;
 
 	/** Map of currently monitored jobs */
 	private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners;
@@ -60,10 +64,13 @@ public class JobLeaderIdService {
 	/** Actions to call when the job leader changes */
 	private JobLeaderIdActions jobLeaderIdActions;
 
-	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception
{
-		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
-
-		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
+	public JobLeaderIdService(
+			HighAvailabilityServices highAvailabilityServices,
+			ScheduledExecutor scheduledExecutor,
+			Time jobTimeout) throws Exception {
+		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices");
+		this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor");
+		this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout");
 
 		jobLeaderIdListeners = new HashMap<>(4);
 
@@ -142,8 +149,8 @@ public class JobLeaderIdService {
 		if (!jobLeaderIdListeners.containsKey(jobId)) {
 			LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(jobId);
 
-			JobLeaderIdListener jobidListener = new JobLeaderIdListener(jobId, jobLeaderIdActions,
leaderRetrievalService);
-			jobLeaderIdListeners.put(jobId, jobidListener);
+			JobLeaderIdListener jobIdListener = new JobLeaderIdListener(jobId, jobLeaderIdActions,
leaderRetrievalService);
+			jobLeaderIdListeners.put(jobId, jobIdListener);
 		}
 	}
 
@@ -183,6 +190,16 @@ public class JobLeaderIdService {
 		return listener.getLeaderIdFuture();
 	}
 
+	public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
+		JobLeaderIdListener jobLeaderIdListener = jobLeaderIdListeners.get(jobId);
+
+		if (null != jobLeaderIdListener) {
+			return Objects.equals(timeoutId, jobLeaderIdListener.getTimeoutId());
+		} else {
+			return false;
+		}
+	}
+
 	// --------------------------------------------------------------------------------
 	// Static utility classes
 	// --------------------------------------------------------------------------------
@@ -193,6 +210,7 @@ public class JobLeaderIdService {
 	 * listener.
 	 */
 	private final class JobLeaderIdListener implements LeaderRetrievalListener {
+		private final Object timeoutLock = new Object();
 		private final JobID jobId;
 		private final JobLeaderIdActions listenerJobLeaderIdActions;
 		private final LeaderRetrievalService leaderRetrievalService;
@@ -200,6 +218,15 @@ public class JobLeaderIdService {
 		private volatile CompletableFuture<UUID> leaderIdFuture;
 		private volatile boolean running = true;
 
+		/** Null if no timeout has been scheduled; otherwise non null */
+		@Nullable
+		private  volatile ScheduledFuture<?> timeoutFuture;
+
+		/** Null if no timeout has been scheduled; otherwise non null */
+		@Nullable
+		private volatile UUID timeoutId;
+
+
 		private JobLeaderIdListener(
 				JobID jobId,
 				JobLeaderIdActions listenerJobLeaderIdActions,
@@ -210,6 +237,8 @@ public class JobLeaderIdService {
 
 			leaderIdFuture = new FlinkCompletableFuture<>();
 
+			activateTimeout();
+
 			// start the leader service we're listening to
 			leaderRetrievalService.start(this);
 		}
@@ -218,9 +247,15 @@ public class JobLeaderIdService {
 			return leaderIdFuture;
 		}
 
+		@Nullable
+		public UUID getTimeoutId() {
+			return timeoutId;
+		}
+
 		public void stop() throws Exception {
 			running = false;
 			leaderRetrievalService.stop();
+			cancelTimeout();
 			leaderIdFuture.completeExceptionally(new Exception("Job leader id service has been stopped."));
 		}
 
@@ -244,29 +279,22 @@ public class JobLeaderIdService {
 					leaderIdFuture.complete(leaderSessionId);
 				}
 
-				try {
-					final JobSchedulingStatus jobStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
-					if (jobStatus == JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING)
{
-						if (leaderSessionId == null) {
-							// there is no new leader
-							if (previousJobLeaderId != null) {
-								// we had a previous job leader, so notify about his lost leadership
-								listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
-							}
-						} else {
-							if (previousJobLeaderId != null && !leaderSessionId.equals(previousJobLeaderId))
{
-								// we had a previous leader and he's not the same as the new leader
-								listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
-							}
+				if (previousJobLeaderId != null && !previousJobLeaderId.equals(leaderSessionId))
{
+					// we had a previous job leader, so notify about his lost leadership
+					listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
+
+					if (null == leaderSessionId) {
+						// No current leader active ==> Set a timeout for the job
+						activateTimeout();
+
+						// check if we got stopped asynchronously
+						if (!running) {
+							cancelTimeout();
 						}
-					} else {
-						// the job is no longer running so remove it
-						listenerJobLeaderIdActions.removeJob(jobId);
 					}
-				} catch (IOException e) {
-					// cannot tell whether the job is still running or not so just remove the listener
-					LOG.debug("Encountered an error while checking the job registry for running jobs.",
e);
-					listenerJobLeaderIdActions.removeJob(jobId);
+				} else if (null != leaderSessionId) {
+					// Cancel timeout because we've found an active leader for it
+					cancelTimeout();
 				}
 			} else {
 				LOG.debug("A leader id change {}@{} has been detected after the listener has been stopped.",
@@ -283,5 +311,32 @@ public class JobLeaderIdService {
 					JobLeaderIdListener.class.getSimpleName(), exception);
 			}
 		}
+
+		private void activateTimeout() {
+			synchronized (timeoutLock) {
+				cancelTimeout();
+
+				final UUID newTimeoutId = UUID.randomUUID();
+
+				timeoutId = newTimeoutId;
+				timeoutFuture = scheduledExecutor.schedule(new Runnable() {
+					@Override
+					public void run() {
+						listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId);
+					}
+				}, jobTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			}
+		}
+
+		private void cancelTimeout() {
+			synchronized (timeoutLock) {
+				if (timeoutFuture != null) {
+					timeoutFuture.cancel(true);
+				}
+
+				timeoutFuture = null;
+				timeoutId = null;
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3bcbfda..badfbe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -817,11 +817,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 
 		@Override
-		public void removeJob(final JobID jobId) {
+		public void notifyJobTimeout(final JobID jobId, final UUID timeoutId) {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					ResourceManager.this.removeJob(jobId);
+					if (jobLeaderIdService.isValidTimeout(jobId, timeoutId)) {
+						removeJob(jobId);
+					}
 				}
 			});
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
index 920f1fc..d04d852 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -19,10 +19,9 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
@@ -34,10 +33,15 @@ public class ResourceManagerConfiguration {
 
 	private final Time timeout;
 	private final Time heartbeatInterval;
+	private final Time jobTimeout;
 
-	public ResourceManagerConfiguration(Time timeout, Time heartbeatInterval) {
-		this.timeout = Preconditions.checkNotNull(timeout);
-		this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval);
+	public ResourceManagerConfiguration(
+			Time timeout,
+			Time heartbeatInterval,
+			Time jobTimeout) {
+		this.timeout = Preconditions.checkNotNull(timeout, "timeout");
+		this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval");
+		this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout");
 	}
 
 	public Time getTimeout() {
@@ -48,39 +52,45 @@ public class ResourceManagerConfiguration {
 		return heartbeatInterval;
 	}
 
+	public Time getJobTimeout() {
+		return jobTimeout;
+	}
+
 	// --------------------------------------------------------------------------
 	// Static factory methods
 	// --------------------------------------------------------------------------
 
 	public static ResourceManagerConfiguration fromConfiguration(Configuration configuration)
throws ConfigurationException {
-		ConfigOption<String> timeoutOption = ConfigOptions
-			.key(ConfigConstants.AKKA_ASK_TIMEOUT)
-			.defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
-
-		final String strTimeout = configuration.getString(timeoutOption);
+		final String strTimeout = configuration.getString(AkkaOptions.AKKA_ASK_TIMEOUT);
 		final Time timeout;
 
 		try {
 			timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
 		} catch (NumberFormatException e) {
 			throw new ConfigurationException("Could not parse the resource manager's timeout " +
-				"value " + timeoutOption + '.', e);
+				"value " + AkkaOptions.AKKA_ASK_TIMEOUT + '.', e);
 		}
 
-		ConfigOption<String> heartbeatIntervalOption = ConfigOptions
-			.key(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL)
-			.defaultValue(timeout.toString());
-
-		final String strHeartbeatInterval = configuration.getString(heartbeatIntervalOption);
+		final String strHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL);
 		final Time heartbeatInterval;
 
 		try {
 			heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis());
 		} catch (NumberFormatException e) {
 			throw new ConfigurationException("Could not parse the resource manager's heartbeat interval
" +
-				"value " + timeoutOption + '.', e);
+				"value " + AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e);
+		}
+
+		final String strJobTimeout = configuration.getString(ResourceManagerOptions.JOB_TIMEOUT);
+		final Time jobTimeout;
+
+		try {
+			jobTimeout = Time.milliseconds(Duration.apply(strJobTimeout).toMillis());
+		} catch (NumberFormatException e) {
+			throw new ConfigurationException("Could not parse the resource manager's job timeout "
+
+				"value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e);
 		}
 
-		return new ResourceManagerConfiguration(timeout, heartbeatInterval);
+		return new ResourceManagerConfiguration(timeout, heartbeatInterval, jobTimeout);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index e0dee0b..749b407 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -54,7 +54,10 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
-		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor(),
+			resourceManagerConfiguration.getJobTimeout());
 
 		this.resourceManager = new StandaloneResourceManager(
 			rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
new file mode 100644
index 0000000..d5e99bd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class JobLeaderIdServiceTest extends TestLogger {
+
+	/**
+	 * Tests adding a job and finding out its leader id
+	 */
+	@Test(timeout = 10000)
+	public void testAddingJob() throws Exception {
+		final JobID jobId = new JobID();
+		final String address = "foobar";
+		final UUID leaderId = UUID.randomUUID();
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+
+		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
+
+		ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		Time timeout = Time.milliseconds(5000L);
+		JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class);
+
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			scheduledExecutor,
+			timeout);
+
+		jobLeaderIdService.start(jobLeaderIdActions);
+
+		jobLeaderIdService.addJob(jobId);
+
+		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+
+		// notify the leader id service about the new leader
+		leaderRetrievalService.notifyListener(address, leaderId);
+
+		assertEquals(leaderId, leaderIdFuture.get());
+
+		assertTrue(jobLeaderIdService.containsJob(jobId));
+	}
+
+	/**
+	 * Tests that removing a job completes the job leader id future exceptionally
+	 */
+	@Test(timeout = 10000)
+	public void testRemovingJob() throws Exception {
+		final JobID jobId = new JobID();
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+
+		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
+
+		ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		Time timeout = Time.milliseconds(5000L);
+		JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class);
+
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			scheduledExecutor,
+			timeout);
+
+		jobLeaderIdService.start(jobLeaderIdActions);
+
+		jobLeaderIdService.addJob(jobId);
+
+		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+
+		// remove the job before we could find a leader
+		jobLeaderIdService.removeJob(jobId);
+
+		assertFalse(jobLeaderIdService.containsJob(jobId));
+
+		try {
+			leaderIdFuture.get();
+
+			fail("The leader id future should be completed exceptionally.");
+		} catch (ExecutionException ignored) {
+			// expected exception
+		}
+	}
+
+	/**
+	 * Tests that the initial job registration registers a timeout which will call
+	 * {@link JobLeaderIdActions#notifyJobTimeout(JobID, UUID)} when executed.
+	 */
+	@Test
+	public void testInitialJobTimeout() throws Exception {
+		final JobID jobId = new JobID();
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+
+		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
+
+		ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		Time timeout = Time.milliseconds(5000L);
+		JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class);
+
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			scheduledExecutor,
+			timeout);
+
+		jobLeaderIdService.start(jobLeaderIdActions);
+
+		jobLeaderIdService.addJob(jobId);
+
+		assertTrue(jobLeaderIdService.containsJob(jobId));
+
+		ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+		verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), anyLong(), any(TimeUnit.class));
+
+		Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+		timeoutRunnable.run();
+
+		ArgumentCaptor<UUID> timeoutIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
+
+		verify(jobLeaderIdActions, times(1)).notifyJobTimeout(eq(jobId), timeoutIdArgumentCaptor.capture());
+
+		assertTrue(jobLeaderIdService.isValidTimeout(jobId, timeoutIdArgumentCaptor.getValue()));
+	}
+
+	/**
+	 * Tests that a timeout get cancelled once a job leader has been found. Furthermore, it
tests
+	 * that a new timeout is registered after the jobmanager has lost leadership.
+	 */
+	@Test(timeout = 10000)
+	public void jobTimeoutAfterLostLeadership() throws Exception {
+		final JobID jobId = new JobID();
+		final String address = "foobar";
+		final UUID leaderId = UUID.randomUUID();
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+
+		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
+
+		ScheduledFuture<?> timeout1 = mock(ScheduledFuture.class);
+		ScheduledFuture<?> timeout2 = mock(ScheduledFuture.class);
+		final Queue<ScheduledFuture<?>> timeoutQueue = new ArrayDeque<>(Arrays.asList(timeout1,
timeout2));
+		ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+
+		final AtomicReference<Runnable> lastRunnable = new AtomicReference<>();
+		doAnswer(new Answer() {
+			@Override
+			public Object answer(InvocationOnMock invocation) throws Throwable {
+				lastRunnable.set((Runnable) invocation.getArguments()[0]);
+
+				return timeoutQueue.poll();
+			}
+		}).when(scheduledExecutor).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+		Time timeout = Time.milliseconds(5000L);
+		JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class);
+
+		final AtomicReference<UUID> lastTimeoutId = new AtomicReference<>();
+
+		doAnswer(new Answer() {
+			@Override
+			public Object answer(InvocationOnMock invocation) throws Throwable {
+				lastTimeoutId.set((UUID) invocation.getArguments()[1]);
+				return null;
+			}
+		}).when(jobLeaderIdActions).notifyJobTimeout(eq(jobId), any(UUID.class));
+
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			scheduledExecutor,
+			timeout);
+
+		jobLeaderIdService.start(jobLeaderIdActions);
+
+		jobLeaderIdService.addJob(jobId);
+
+		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+
+		// notify the leader id service about the new leader
+		leaderRetrievalService.notifyListener(address, leaderId);
+
+		assertEquals(leaderId, leaderIdFuture.get());
+
+		assertTrue(jobLeaderIdService.containsJob(jobId));
+
+		// check that the first timeout got cancelled
+		verify(timeout1, times(1)).cancel(anyBoolean());
+
+		verify(scheduledExecutor, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+		// initial timeout runnable which should no longer have an effect
+		Runnable runnable = lastRunnable.get();
+
+		assertNotNull(runnable);
+
+		runnable.run();
+
+		verify(jobLeaderIdActions, times(1)).notifyJobTimeout(eq(jobId), any(UUID.class));
+
+		// the timeout should no longer be valid
+		assertFalse(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get()));
+
+		// lose leadership
+		leaderRetrievalService.notifyListener("", null);
+
+		verify(scheduledExecutor, times(2)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+
+		// the second runnable should be the new timeout
+		runnable = lastRunnable.get();
+
+		assertNotNull(runnable);
+
+		runnable.run();
+
+		verify(jobLeaderIdActions, times(2)).notifyJobTimeout(eq(jobId), any(UUID.class));
+
+		// the new timeout should be valid
+		assertTrue(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get()));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 2e52eeb..58dedc3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -46,10 +46,16 @@ public class ResourceManagerHATest {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
-		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L),
Time.seconds(5L));
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+			Time.seconds(5L),
+			Time.seconds(5L),
+			Time.minutes(5L));
 		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor(),
+			resourceManagerConfiguration.getJobTimeout());
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 		final ResourceManager resourceManager =

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 2622634..031f76e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -193,10 +193,16 @@ public class ResourceManagerJobMasterTest {
 		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
 
-		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L),
Time.seconds(5L));
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+			Time.seconds(5L),
+			Time.seconds(5L),
+			Time.minutes(5L));
 		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor(),
+			resourceManagerConfiguration.getJobTimeout());
 
 		ResourceManager resourceManager = new StandaloneResourceManager(
 			rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 1016181..4456235 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -146,9 +146,15 @@ public class ResourceManagerTaskExecutorTest {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
-		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L),
Time.seconds(5L));
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+			Time.seconds(5L),
+			Time.seconds(5L),
+			Time.minutes(5L));
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor(),
+			resourceManagerConfiguration.getJobTimeout());
 
 
 		StandaloneResourceManager resourceManager =

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a3ba436..1e5edbe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -107,8 +107,15 @@ public class SlotProtocolTest extends TestLogger {
 		TestingLeaderElectionService rmLeaderElectionService =
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
-		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L),
Time.seconds(5L));
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHaServices);
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+			Time.seconds(5L),
+			Time.seconds(5L),
+			Time.minutes(5L));
+
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			testingHaServices,
+			testRpcService.getScheduledExecutor(),
+			resourceManagerConfiguration.getJobTimeout());
 
 		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		SpiedResourceManager resourceManager =
@@ -208,9 +215,15 @@ public class SlotProtocolTest extends TestLogger {
 			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
-		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L),
Time.seconds(5L));
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+			Time.seconds(5L),
+			Time.seconds(5L),
+			Time.minutes(5L));
 
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHaServices);
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			testingHaServices,
+			testRpcService.getScheduledExecutor(),
+			resourceManagerConfiguration.getJobTimeout());
 
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		ResourceManager<ResourceID> resourceManager =

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 0f884f2..898584c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -93,9 +93,15 @@ public class TaskExecutorITCase {
 		testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress,
jmLeaderId));
 
 		TestingSerialRpcService rpcService = new TestingSerialRpcService();
-		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.milliseconds(500L),
Time.milliseconds(500L));
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+			Time.milliseconds(500L),
+			Time.milliseconds(500L),
+			Time.minutes(5L));
 		SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHAServices);
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			testingHAServices,
+			rpcService.getScheduledExecutor(),
+			resourceManagerConfiguration.getJobTimeout());
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
 
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index e2aa6ec..ddeb02e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -187,7 +187,10 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	private ResourceManager<?> createResourceManager(Configuration config) throws Exception
{
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
 		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
-		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices);
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			haServices,
+			commonRpcService.getScheduledExecutor(),
+			resourceManagerConfiguration.getJobTimeout());
 
 		return new YarnResourceManager(config,
 				ENV,


Mime
View raw message