flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-3800] [runtime] Introduce SUSPENDED job status
Date Thu, 23 Jun 2016 16:04:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master cfe629340 -> 6420c1c26


http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
new file mode 100644
index 0000000..8ee4973
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class JobManagerHARecoveryTest {
+
+	private static ActorSystem system;
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
+	/**
+	 * Tests that the persisted job is not removed from the SubmittedJobGraphStore if the JobManager
+	 * loses its leadership. Furthermore, it tests that the job manager can recover the job
from
+	 * the SubmittedJobGraphStore.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testJobRecoveryWhenLosingLeadership() throws Exception {
+
+		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+		FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS);
+		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+		Configuration flinkConfiguration = new Configuration();
+		UUID leaderSessionID = UUID.randomUUID();
+		UUID newLeaderSessionID = UUID.randomUUID();
+		int slots = 2;
+		ActorRef archive = null;
+		ActorRef jobManager = null;
+		ActorRef taskManager = null;
+
+		flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.newFolder().toString());
+		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
+
+		try {
+
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+			MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
+			TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
+			TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService();
+
+			InstanceManager instanceManager = new InstanceManager();
+			instanceManager.addInstanceListener(scheduler);
+
+			archive = system.actorOf(Props.create(
+				MemoryArchivist.class,
+				10), "archive");
+
+			Props jobManagerProps = Props.create(
+				TestingJobManager.class,
+				flinkConfiguration,
+				new ForkJoinPool(),
+				instanceManager,
+				scheduler,
+				new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
+				archive,
+				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+				timeout,
+				myLeaderElectionService,
+				mySubmittedJobGraphStore,
+				new StandaloneCheckpointRecoveryFactory(),
+				new SavepointStore(new HeapStateStore()),
+				jobRecoveryTimeout);
+
+			jobManager = system.actorOf(jobManagerProps, "jobmanager");
+			ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
+
+			taskManager = TaskManager.startTaskManagerComponentsAndActor(
+				flinkConfiguration,
+				ResourceID.generate(),
+				system,
+				"localhost",
+				Option.apply("taskmanager"),
+				Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
+				true,
+				TestingTaskManager.class);
+
+			ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
+
+			Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
+
+			Await.ready(tmAlive, deadline.timeLeft());
+
+			JobVertex sourceJobVertex = new JobVertex("Source");
+			sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+			sourceJobVertex.setParallelism(slots);
+
+			JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
+
+			Future<Object> isLeader = gateway.ask(
+				TestingJobManagerMessages.getNotifyWhenLeader(),
+				deadline.timeLeft());
+
+			Future<Object> isConnectedToJobManager = tmGateway.ask(
+				new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
+				deadline.timeLeft());
+
+			// tell jobManager that he's the leader
+			myLeaderElectionService.isLeader(leaderSessionID);
+			// tell taskManager who's the leader
+			myLeaderRetrievalService.notifyListener(gateway.path(), leaderSessionID);
+
+			Await.ready(isLeader, deadline.timeLeft());
+			Await.ready(isConnectedToJobManager, deadline.timeLeft());
+
+			// submit blocking job
+			Future<Object> jobSubmitted = gateway.ask(
+				new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
+				deadline.timeLeft());
+
+			Await.ready(jobSubmitted, deadline.timeLeft());
+
+			Future<Object> jobRemoved = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
deadline.timeLeft());
+
+			// Revoke leadership
+			myLeaderElectionService.notLeader();
+
+			// check that the job gets removed from the JobManager
+			Await.ready(jobRemoved, deadline.timeLeft());
+			// but stays in the submitted job graph store
+			assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
+
+			Future<Object> jobRunning = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(),
JobStatus.RUNNING), deadline.timeLeft());
+
+			// Make JobManager again a leader
+			myLeaderElectionService.isLeader(newLeaderSessionID);
+			// tell the TaskManager about it
+			myLeaderRetrievalService.notifyListener(gateway.path(), newLeaderSessionID);
+
+			// wait that the job is recovered and reaches state RUNNING
+			Await.ready(jobRunning, deadline.timeLeft());
+
+			Future<Object> jobFinished = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
deadline.timeLeft());
+
+			BlockingInvokable.unblock();
+
+			// wait til the job has finished
+			Await.ready(jobFinished, deadline.timeLeft());
+
+			// check that the job has been removed from the submitted job graph store
+			assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
+		} finally {
+			if (archive != null) {
+				archive.tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (jobManager != null) {
+				jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (taskManager != null) {
+				taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+	}
+
+	static class MySubmittedJobGraphStore implements SubmittedJobGraphStore {
+		Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
+
+		@Override
+		public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
+
+		}
+
+		@Override
+		public void stop() throws Exception {
+
+		}
+
+		@Override
+		public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+			return new ArrayList<>(storedJobs.values());
+		}
+
+		@Override
+		public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+			if (storedJobs.containsKey(jobId)) {
+				return Option.apply(storedJobs.get(jobId));
+			} else {
+				return Option.apply(null);
+			}
+		}
+
+		@Override
+		public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+			storedJobs.put(jobGraph.getJobId(), jobGraph);
+		}
+
+		@Override
+		public void removeJobGraph(JobID jobId) throws Exception {
+			storedJobs.remove(jobId);
+		}
+
+		boolean contains(JobID jobId) {
+			return storedJobs.containsKey(jobId);
+		}
+	}
+
+	public static class BlockingInvokable extends AbstractInvokable {
+
+		private static boolean blocking = true;
+		private static Object lock = new Object();
+
+		@Override
+		public void invoke() throws Exception {
+			while(blocking) {
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+		}
+
+		public static void unblock() {
+			blocking = false;
+
+			synchronized (lock) {
+				lock.notifyAll();
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index ccd2156..57de2cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -70,7 +70,11 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
 
-		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, new FixedDelayRestartStrategy(9999,
100));
+		configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
+		configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);
+		configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 milli");
+
+		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
 		cluster.start(false);
 
 		// wait for actors to be alive so that they have started their leader retrieval service
@@ -170,7 +174,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 			if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
 				ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged)
message;
 
-				if (jobStatusChanged.newJobStatus().isTerminalState()) {
+				if (jobStatusChanged.newJobStatus().isGloballyTerminalState() || jobStatusChanged.newJobStatus()
== JobStatus.SUSPENDED) {
 					terminalState.success(true);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 89f462c..19cc444 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -35,6 +34,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -47,6 +48,8 @@ import static org.junit.Assert.*;
 
 public class LeaderChangeStateCleanupTest extends TestLogger {
 
+	private static Logger LOG = LoggerFactory.getLogger(LeaderChangeStateCleanupTest.class);
+
 	private static FiniteDuration timeout = TestingUtils.TESTING_DURATION();
 
 	private int numJMs = 2;
@@ -68,7 +71,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
 
-		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, null);
+		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
 		cluster.start(false); // TaskManagers don't have to register at the JobManager
 
 		cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive
@@ -225,11 +228,15 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
 		Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
 
-		// make JM(0) again the leader --> this implies first a leadership revokal
+		LOG.info("Make JM(0) again the leader. This should first revoke the leadership.");
+
+		// make JM(0) again the leader --> this implies first a leadership revocation
 		cluster.grantLeadership(0, newLeaderSessionID);
 
 		Await.ready(jobRemoval, timeout);
 
+		LOG.info("Job removed.");
+
 		// The TMs should not be able to reconnect since they don't know the current leader
 		// session ID
 		try {
@@ -239,6 +246,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 			// expected exception since the TMs have still the old leader session ID
 		}
 
+		LOG.info("Notify TMs about the new (old) leader.");
+
 		// notify the TMs about the new (old) leader
 		cluster.notifyRetrievalListeners(0, newLeaderSessionID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index cd89fa6..e596166 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -39,7 +39,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster
{
 
 	private final Configuration userConfiguration;
 	private final boolean useSingleActorSystem;
-	private final RestartStrategy restartStrategy;
 
 	public List<TestingLeaderElectionService> leaderElectionServices;
 	public List<TestingLeaderRetrievalService> leaderRetrievalServices;
@@ -49,8 +48,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster
{
 	public LeaderElectionRetrievalTestingCluster(
 			Configuration userConfiguration,
 			boolean singleActorSystem,
-			boolean synchronousDispatcher,
-			RestartStrategy restartStrategy) {
+			boolean synchronousDispatcher) {
 		super(userConfiguration, singleActorSystem, synchronousDispatcher);
 
 		this.userConfiguration = userConfiguration;
@@ -58,8 +56,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster
{
 
 		leaderElectionServices = new ArrayList<TestingLeaderElectionService>();
 		leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>();
-
-		this.restartStrategy = restartStrategy;
 	}
 
 	@Override
@@ -95,15 +91,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster
{
 				ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
 	}
 
-	@Override
-	public RestartStrategy getRestartStrategy(RestartStrategy other) {
-		if (this.restartStrategy != null) {
-			return this.restartStrategy;
-		} else {
-			return other;
-		}
-	}
-
 	public void grantLeadership(int index, UUID leaderSessionID) {
 		if(leaderIndex >= 0) {
 			// first revoke leadership

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 690c042..64b25e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -201,7 +201,7 @@ public class TaskCancelTest {
 				if (status.status() == JobStatus.RUNNING) {
 					return;
 				}
-				else if (status.status().isTerminalState()) {
+				else if (status.status().isGloballyTerminalState()) {
 					throw new Exception("JobStatus changed to " + status.status()
 							+ " while waiting for job to start running.");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
index 360cb1f..91114ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
@@ -79,7 +79,7 @@ public class JobManagerActorTestUtils {
 				if (jobStatus == expectedJobStatus) {
 					return;
 				}
-				else if (jobStatus.isTerminalState()) {
+				else if (jobStatus.isGloballyTerminalState()) {
 					throw new IllegalStateException("Job is in terminal state " + jobStatus + ", "
 							+ "but was waiting for " + expectedJobStatus + ".");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 0c4ffb9..763bd36 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -186,10 +186,6 @@ class TestingCluster(
     None
   }
 
-  def getRestartStrategy(restartStrategy: RestartStrategy) = {
-    restartStrategy
-  }
-
   @throws(classOf[TimeoutException])
   @throws(classOf[InterruptedException])
   def waitForTaskManagersToBeAlive(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index f2f761d..028045a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -86,7 +86,7 @@ public class JobManagerCommunicationUtils {
 		if (status == null) {
 			throw new Exception("Could not cancel job - no running jobs");	
 		}
-		else if (status.getJobState().isTerminalState()) {
+		else if (status.getJobState().isGloballyTerminalState()) {
 			throw new Exception("Could not cancel job - job is not running any more");
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index b4ffbd4..5a10604 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -25,7 +25,6 @@ import akka.actor.UntypedActor;
 import akka.testkit.TestActorRef;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -46,6 +45,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -119,10 +119,10 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	// ---------------------------------------------------------------------------------------------
 
 	/**
-	 * Tests that the recovery state is cleaned up after a JobManager stops.
+	 * Tests that the HA job is not cleaned up when the jobmanager is stopped.
 	 */
 	@Test
-	public void testJobManagerCleanUp() throws Exception {
+	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
 		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
 				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
 
@@ -153,8 +153,9 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 			flink.shutdown();
 		}
 
-		// Verify that everything is clean
-		verifyCleanRecoveryState(config);
+		// verify that the persisted job data has not been removed from ZooKeeper when the JM has
+		// been shutdown
+		verifyRecoveryState(config);
 	}
 
 	/**
@@ -225,6 +226,14 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 			if (!success) {
 				fail("Non-leading JM was still holding reference to the job graph.");
 			}
+
+			Future<Object> jobRemoved = leadingJobManager.ask(
+				new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
+				deadline.timeLeft());
+
+			leadingJobManager.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+
+			Await.ready(jobRemoved, deadline.timeLeft());
 		}
 		finally {
 			flink.shutdown();
@@ -482,4 +491,36 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 		}
 	}
 
+	/**
+	 * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
+	 */
+	private static void verifyRecoveryState(Configuration config) throws Exception {
+		// File state backend empty
+		Collection<File> stateHandles = FileUtils.listFiles(
+			FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+
+		if (stateHandles.isEmpty()) {
+			fail("File state backend has been cleaned: " + stateHandles);
+		}
+
+		// ZooKeeper
+		String currentJobsPath = config.getString(
+			ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+
+		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
+
+		if (stat.getCversion() == 0) {
+			// Sanity check: verify that some changes have been performed
+			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
+				"this test. What are you testing?");
+		}
+
+		if (stat.getNumChildren() == 0) {
+			// Children have been cleaned up?
+			fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
+				ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index f291c02..23b3adc 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -150,7 +150,7 @@ class YarnJobManager(
           log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for
" +
             s"job $stopWhenJobFinished")
         } else {
-          if (jobStatus.status.isTerminalState) {
+          if (jobStatus.status.isGloballyTerminalState) {
             log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}.
" +
               s"Shutting down YARN session")
             if (jobStatus.status == JobStatus.FINISHED) {


Mime
View raw message