flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/11] flink git commit: [FLINK-5501] [runtime] Extend RunningJobRegistry to job status created/running/done
Date Tue, 28 Feb 2017 18:36:44 GMT
Repository: flink
Updated Branches:
  refs/heads/master daf0ccda4 -> 3086af534


[FLINK-5501] [runtime] Extend RunningJobRegistry to job status created/running/done

This closes #3385


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7011d78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7011d78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7011d78

Branch: refs/heads/master
Commit: e7011d78a3019880a4e00ab5f697c3cfd20161bb
Parents: 40743aa
Author: shuai.xus <shuai.xus@alibaba-inc.com>
Authored: Wed Feb 22 14:15:43 2017 +0800
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 28 18:59:09 2017 +0100

----------------------------------------------------------------------
 .../FsNegativeRunningJobsRegistry.java          | 86 ++++++++++++++++----
 .../highavailability/RunningJobsRegistry.java   | 32 ++++++++
 .../highavailability/ZookeeperRegistry.java     | 42 +++++++++-
 .../highavailability/nonha/NonHaRegistry.java   | 31 +++++++
 .../runtime/jobmaster/JobManagerRunner.java     | 35 ++++----
 .../minicluster/MiniClusterJobDispatcher.java   | 30 ++++---
 .../FsNegativeRunningJobsRegistryTest.java      | 25 ++++--
 .../highavailability/ZooKeeperRegistryTest.java | 15 +++-
 .../yarn/YarnFlinkApplicationMasterRunner.java  | 19 ++---
 9 files changed, 249 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
index 9d8b226..9e92263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -30,14 +30,19 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
- * marking finished jobs via marker files.
+ * marking running jobs via running marker files,
+ * marking finished jobs via finished marker files.
  * 
  * <p>The general contract is the following:
  * <ul>
  *     <li>Initially, a marker file does not exist (no one created it, yet), which
means
- *         the specific job is assumed to be running</li>
+ *         the specific job is pending</li>
+ *     <li>The first JobManager that granted leadership calls this service to create
the running marker file,
+ *         which marks the job as running.</li>
  *     <li>The JobManager that finishes calls this service to create the marker file,
  *         which marks the job as finished.</li>
+ *     <li>If a JobManager gains leadership but see the running marker file,
+ *         it will realize that the job has been scheduled and need reconciling.</li>
  *     <li>If a JobManager gains leadership at some point when shutdown is in progress,
  *         it will see the marker file and realize that the job is finished.</li>
  *     <li>The application framework is expected to clean the file once the application
@@ -52,7 +57,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
 
-	private static final String PREFIX = ".job_complete_";
+	private static final String DONE_PREFIX = ".job_complete_";
+
+	private static final String RUNNING_PREFIX = ".job_runing_";
 
 	private final FileSystem fileSystem;
 
@@ -108,21 +115,19 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry
{
 	@Override
 	public void setJobRunning(JobID jobID) throws IOException {
 		checkNotNull(jobID, "jobID");
-		final Path filePath = createMarkerFilePath(jobID);
+		final Path filePath = createMarkerFilePath(RUNNING_PREFIX, jobID);
 
-		// delete the marker file, if it exists
-		try {
-			fileSystem.delete(filePath, false);
-		}
-		catch (FileNotFoundException e) {
-			// apparently job was already considered running
+		// create the file
+		// to avoid an exception if the job already exists, set overwrite=true
+		try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
+			out.write(42);
 		}
 	}
 
 	@Override
 	public void setJobFinished(JobID jobID) throws IOException {
 		checkNotNull(jobID, "jobID");
-		final Path filePath = createMarkerFilePath(jobID);
+		final Path filePath = createMarkerFilePath(DONE_PREFIX, jobID);
 
 		// create the file
 		// to avoid an exception if the job already exists, set overwrite=true
@@ -137,17 +142,64 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry
{
 
 		// check for the existence of the file
 		try {
-			fileSystem.getFileStatus(createMarkerFilePath(jobID));
-			// file was found --> job is terminated
+			fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
+			// file was found --> job is running
+			return true;
+		}
+		catch (FileNotFoundException e) {
+			// file does not exist, job is not running
 			return false;
 		}
+	}
+
+	@Override
+	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
+		checkNotNull(jobID, "jobID");
+
+		// first check for the existence of the complete file
+		try {
+			fileSystem.getFileStatus(createMarkerFilePath(DONE_PREFIX, jobID));
+			// complete file was found --> job is terminated
+			return JobSchedulingStatus.DONE;
+		}
+		catch (FileNotFoundException e) {
+			// file does not exist, job is running or pending
+		}
+		// check for the existence of the running file
+		try {
+			fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
+			// running file was found --> job is terminated
+			return JobSchedulingStatus.RUNNING;
+		}
+		catch (FileNotFoundException e) {
+			// file does not exist, job is not scheduled
+			return JobSchedulingStatus.PENDING;
+		}
+	}
+
+	@Override
+	public void clearJob(JobID jobID) throws IOException {
+		checkNotNull(jobID, "jobID");
+		final Path runningFilePath = createMarkerFilePath(RUNNING_PREFIX, jobID);
+
+		// delete the running marker file, if it exists
+		try {
+			fileSystem.delete(runningFilePath, false);
+		}
+		catch (FileNotFoundException e) {
+		}
+
+		final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, jobID);
+
+		// delete the finished marker file, if it exists
+		try {
+			fileSystem.delete(doneFilePath, false);
+		}
 		catch (FileNotFoundException e) {
-			// file does not exist, job is still running
-			return true;
 		}
 	}
 
-	private Path createMarkerFilePath(JobID jobId) {
-		return new Path(basePath, PREFIX + jobId.toString());
+	private Path createMarkerFilePath(String prefix, JobID jobId) {
+		return new Path(basePath, prefix + jobId.toString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
index e7c131c..020f2ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -33,6 +33,17 @@ import java.io.IOException;
  */
 public interface RunningJobsRegistry {
 
+	public enum JobSchedulingStatus {
+		/** Job has not been scheduled */
+		PENDING,
+
+		/** Job has been scheduled */
+		RUNNING,
+
+		/** Job has been finished */
+		DONE;
+	}
+
 	/**
 	 * Marks a job as running.
 	 * 
@@ -63,4 +74,25 @@ public interface RunningJobsRegistry {
 	 *                     failed and could not be retried.
 	 */
 	boolean isJobRunning(JobID jobID) throws IOException;
+
+	/**
+	 * Get the scheduing status of a job.
+	 *
+	 * @param jobID The id of the job to check.
+	 * @return The job scheduling status.
+	 * 
+	 * @throws IOException Thrown when the communication with the highly-available storage or
registry
+	 *                     failed and could not be retried.
+	 */
+	JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;
+
+	/**
+	 * Clear job state form the registry, usually called after job finish
+	 *
+	 * @param jobID The id of the job to check.
+	 * 
+	 * @throws IOException Thrown when the communication with the highly-available storage or
registry
+	 *                     failed and could not be retried.
+	 */
+	void clearJob(JobID jobID) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
index c0621af..31a4535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.zookeeper.data.Stat;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,7 +56,7 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
 		try {
 			String zkPath = runningJobPath + jobID.toString();
 			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-			this.client.setData().forPath(zkPath);
+			this.client.setData().forPath(zkPath, JobSchedulingStatus.RUNNING.name().getBytes(Charset.forName("utf-8")));
 		}
 		catch (Exception e) {
 			throw new IOException("Set running state to zk fail for job " + jobID.toString(), e);
@@ -69,7 +70,7 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
 		try {
 			String zkPath = runningJobPath + jobID.toString();
 			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-			this.client.delete().forPath(zkPath);
+			this.client.setData().forPath(zkPath, JobSchedulingStatus.DONE.name().getBytes(Charset.forName("utf-8")));
 		}
 		catch (Exception e) {
 			throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e);
@@ -83,7 +84,10 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
 		try {
 			Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
 			if (stat != null) {
-				return true;
+				byte[] data = client.getData().forPath(runningJobPath + jobID.toString());
+				if (JobSchedulingStatus.RUNNING.name().equals(new String(data))) {
+					return true;
+				}
 			}
 			return false;
 		}
@@ -91,4 +95,36 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
 			throw new IOException("Get running state from zk fail for job " + jobID.toString(), e);
 		}
 	}
+
+	@Override
+	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
+			if (stat != null) {
+				byte[] data = client.getData().forPath(runningJobPath + jobID.toString());
+				return JobSchedulingStatus.valueOf(new String(data));
+			}
+			return JobSchedulingStatus.PENDING;
+		}
+		catch (Exception e) {
+			throw new IOException("Get finished state from zk fail for job " + jobID.toString(), e);
+		}
+	}
+
+	@Override
+	public void clearJob(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			String zkPath = runningJobPath + jobID.toString();
+			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+			this.client.delete().forPath(zkPath);
+		}
+		catch (Exception e) {
+			throw new IOException("Clear job state from zk fail for " + jobID.toString(), e);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
index 85dd711..e331212 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability.nonha;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 
 import java.util.HashSet;
 
@@ -33,12 +34,16 @@ public class NonHaRegistry implements RunningJobsRegistry {
 	/** The currently running jobs */
 	private final HashSet<JobID> running = new HashSet<>();
 
+	/** The currently finished jobs */
+	private final HashSet<JobID> finished = new HashSet<>();
+
 	@Override
 	public void setJobRunning(JobID jobID) {
 		checkNotNull(jobID);
 
 		synchronized (running) {
 			running.add(jobID);
+			finished.remove(jobID);
 		}
 	}
 
@@ -48,6 +53,7 @@ public class NonHaRegistry implements RunningJobsRegistry {
 
 		synchronized (running) {
 			running.remove(jobID);
+			finished.add(jobID);
 		}
 	}
 
@@ -59,4 +65,29 @@ public class NonHaRegistry implements RunningJobsRegistry {
 			return running.contains(jobID);
 		}
 	}
+
+	@Override
+	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (running) {
+			if (finished.contains(jobID)) {
+				return JobSchedulingStatus.DONE;
+			} else if (running.contains(jobID)) {
+				return JobSchedulingStatus.RUNNING;
+			} else {
+				return JobSchedulingStatus.PENDING;
+			}
+		}
+	}
+
+	@Override
+	public void clearJob(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (running) {
+			running.remove(jobID);
+			finished.remove(jobID);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 9d8e004..6bebd90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -359,29 +360,35 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions,
F
 			// it's okay that job manager wait for the operation complete
 			leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 
-			boolean jobRunning;
+			JobSchedulingStatus schedulingStatus = JobSchedulingStatus.PENDING;
 			try {
-				jobRunning = runningJobsRegistry.isJobRunning(jobGraph.getJobID());
+				schedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
+				if (schedulingStatus.equals(JobSchedulingStatus.DONE)) {
+					log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
+					jobFinishedByOther();
+					return;
+				}
 			} catch (Throwable t) {
-				log.error("Could not access status (running/finished) of job {}. " +
-						"Falling back to assumption that job is running and attempting recovery...",
-						jobGraph.getJobID(), t);
-				jobRunning = true;
+				log.error("Could not access status (running/finished) of job {}. ",	jobGraph.getJobID(),
t);
+				onFatalError(t);
+				return;
 			}
 
 			// Double check the leadership after we confirm that, there is a small chance that multiple
 			// job managers schedule the same job after if they try to recover at the same time.
 			// This will eventually be noticed, but can not be ruled out from the beginning.
 			if (leaderElectionService.hasLeadership()) {
-				if (jobRunning) {
-					try {
-						jobManager.start(leaderSessionID);
-					} catch (Exception e) {
-						onFatalError(new Exception("Could not start the job manager.", e));
+				try {
+					// Now set the running status is after getting leader ship and 
+					// set finished status after job in terminated status.
+					// So if finding the job is running, it means someone has already run the job, need
recover.
+					if (schedulingStatus.equals(JobSchedulingStatus.PENDING)) {
+						runningJobsRegistry.setJobRunning(jobGraph.getJobID());
 					}
-				} else {
-					log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
-					jobFinishedByOther();
+
+					jobManager.start(leaderSessionID);
+				} catch (Exception e) {
+					onFatalError(new Exception("Could not start the job manager.", e));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 7fffaee..9178684 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -182,7 +182,7 @@ public class MiniClusterJobDispatcher {
 			checkState(!shutdown, "mini cluster is shut down");
 			checkState(runners == null, "mini cluster can only execute one job at a time");
 
-			DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
+			DetachedFinalizer finalizer = new DetachedFinalizer(job.getJobID(), numJobManagers);
 
 			this.runners = startJobRunners(job, finalizer, finalizer);
 		}
@@ -217,6 +217,7 @@ public class MiniClusterJobDispatcher {
 		finally {
 			// always clear the status for the next job
 			runners = null;
+			clearJobRunningState(job.getJobID());
 		}
 	}
 
@@ -227,16 +228,6 @@ public class MiniClusterJobDispatcher {
 
 		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
 
-		// we first need to mark the job as running in the HA services, so that the
-		// JobManager leader will recognize that it as work to do
-		try {
-			haServices.getRunningJobsRegistry().setJobRunning(job.getJobID());
-		}
-		catch (Throwable t) {
-			throw new JobExecutionException(job.getJobID(),
-					"Could not register the job at the high-availability services", t);
-		}
-
 		// start all JobManagers
 		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
 		for (int i = 0; i < numJobManagers; i++) {
@@ -273,6 +264,17 @@ public class MiniClusterJobDispatcher {
 		return runners;
 	}
 
+	private void clearJobRunningState(JobID jobID) {
+		// we mark the job as finished in the HA services, so need
+		// to remove the data after job finished
+		try {
+			haServices.getRunningJobsRegistry().clearJob(jobID);
+		}
+		catch (Throwable t) {
+			LOG.warn("Could not clear the job {} at the high-availability services", jobID.toString(),
t);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  test methods to simulate job master failures
 	// ------------------------------------------------------------------------
@@ -298,9 +300,12 @@ public class MiniClusterJobDispatcher {
 	 */
 	private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
 
+		private final JobID jobID;
+
 		private final AtomicInteger numJobManagersToWaitFor;
 
-		private DetachedFinalizer(int numJobManagersToWaitFor) {
+		private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) {
+			this.jobID = jobID;
 			this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
 		}
 
@@ -327,6 +332,7 @@ public class MiniClusterJobDispatcher {
 		private void decrementCheckAndCleanup() {
 			if (numJobManagersToWaitFor.decrementAndGet() == 0) {
 				MiniClusterJobDispatcher.this.runners = null;
+				MiniClusterJobDispatcher.this.clearJobRunningState(jobID);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
index f1ece0e..bbafcf0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
@@ -28,6 +29,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -45,20 +47,23 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger {
 
 		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
 
-		// initially, without any call, the job is considered running
-		assertTrue(registry.isJobRunning(jid));
+		// initially, without any call, the job is pending
+		assertFalse(registry.isJobRunning(jid));
+		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING);
 
-		// repeated setting should not affect the status
+		// after set running, the job is running
 		registry.setJobRunning(jid);
 		assertTrue(registry.isJobRunning(jid));
+		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.RUNNING);
 
 		// set the job to finished and validate
 		registry.setJobFinished(jid);
-		assertFalse(registry.isJobRunning(jid));
+		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
 
 		// another registry should pick this up
 		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
-		assertFalse(otherRegistry.isJobRunning(jid));
+		assertTrue(otherRegistry.isJobRunning(jid));
+		assertEquals(otherRegistry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
 	}
 
 	@Test
@@ -73,13 +78,21 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger {
 		// set the job to finished and validate
 		registry.setJobFinished(jid);
 		assertFalse(registry.isJobRunning(jid));
+		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
 
-		// set the job to back to running and validate
+		// set the job to running does not overwrite the finished status
 		registry.setJobRunning(jid);
 		assertTrue(registry.isJobRunning(jid));
+		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
 
 		// another registry should pick this up
 		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
 		assertTrue(otherRegistry.isJobRunning(jid));
+		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+
+		// clear the running and finished marker, it will be pending
+		otherRegistry.clearJob(jid);
+		assertFalse(otherRegistry.isJobRunning(jid));
+		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
index 72982c8..8c91898 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
@@ -22,6 +22,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
@@ -29,7 +30,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class ZooKeeperRegistryTest extends TestLogger {
 	private TestingServer testingServer;
@@ -61,14 +64,20 @@ public class ZooKeeperRegistryTest extends TestLogger {
 
 		try {
 			JobID jobID = JobID.generate();
-			assertTrue(!zkRegistry.isJobRunning(jobID));
+			assertFalse(zkRegistry.isJobRunning(jobID));
+			assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING);
 
 			zkRegistry.setJobRunning(jobID);
 			assertTrue(zkRegistry.isJobRunning(jobID));
+			assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.RUNNING);
 
 			zkRegistry.setJobFinished(jobID);
-			assertTrue(!zkRegistry.isJobRunning(jobID));
+			assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.DONE);
+			assertFalse(zkRegistry.isJobRunning(jobID));
 
+			zkRegistry.clearJob(jobID);
+			assertFalse(zkRegistry.isJobRunning(jobID));
+			assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING);
 		} finally {
 			if (zkHaService != null) {
 				zkHaService.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 257212b..e2aa6ec 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -206,16 +205,6 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 		//TODO: generate the job graph from user's jar
 		jobGraph = loadJobGraph(config);
 
-		// we first need to mark the job as running in the HA services, so that the
-		// JobManager leader will recognize that it as work to do
-		try {
-			haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID());
-		}
-		catch (Throwable t) {
-			throw new JobExecutionException(jobGraph.getJobID(),
-					"Could not register the job at the high-availability services", t);
-		}
-
 		// now the JobManagerRunner
 		return new JobManagerRunner(
 				jobGraph, config,
@@ -226,6 +215,14 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	}
 
 	protected void shutdown(ApplicationStatus status, String msg) {
+		// Need to clear the job state in the HA services before shutdown
+		try {
+			haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
+		}
+		catch (Throwable t) {
+			LOG.warn("Could not clear the job at the high-availability services", t);
+		}
+
 		synchronized (lock) {
 			if (jobManagerRunner != null) {
 				try {


Mime
View raw message