flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-5193] [jm] Harden job recovery in case of recovery failures
Date Thu, 01 Dec 2016 20:22:39 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 59f61bf6c -> 9c058871f


[FLINK-5193] [jm] Harden job recovery in case of recovery failures

When recovering multiple jobs a single recovery failure caused all jobs to be not recovered.
This PR changes this behaviour to make the recovery of jobs independent so that a single
failure won't stall the complete recovery. Furthermore, this PR improves the error reporting
for failures originating in the ZooKeeperSubmittedJobGraphStore.

Add test case

Fix failing JobManagerHACheckpointRecoveryITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d314bc52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d314bc52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d314bc52

Branch: refs/heads/release-1.1
Commit: d314bc5235e2573ff77f45d327bc62f521063b71
Parents: 59f61bf
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Nov 29 17:31:08 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Dec 1 17:53:34 2016 +0100

----------------------------------------------------------------------
 .../StandaloneSubmittedJobGraphStore.java       |  11 +-
 .../jobmanager/SubmittedJobGraphStore.java      |  19 ++-
 .../ZooKeeperSubmittedJobGraphStore.java        | 113 +++++++------
 .../zookeeper/ZooKeeperStateHandleStore.java    |  44 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   |  45 ++---
 .../jobmanager/JobManagerHARecoveryTest.java    | 165 ++++++++++++++++++-
 .../StandaloneSubmittedJobGraphStoreTest.java   |  11 +-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |  29 ++--
 .../JobManagerHACheckpointRecoveryITCase.java   |   4 +-
 9 files changed, 315 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index db36f92..8267b9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -20,10 +20,9 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
 
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
@@ -54,12 +53,12 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
 	}
 
 	@Override
-	public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
-		return Option.empty();
+	public Collection<JobID> getJobIds() throws Exception {
+		return Collections.emptyList();
 	}
 
 	@Override
-	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
-		return Collections.emptyList();
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index bd628cd..55c2e79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
  * {@link SubmittedJobGraph} instances for recovery.
@@ -40,16 +38,11 @@ public interface SubmittedJobGraphStore {
 	void stop() throws Exception;
 
 	/**
-	 * Returns a list of all submitted {@link JobGraph} instances.
-	 */
-	List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
-
-	/**
 	 * Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
 	 *
 	 * <p>An Exception is thrown, if no job graph with the given ID exists.
 	 */
-	Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
+	SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception;
 
 	/**
 	 * Adds the {@link SubmittedJobGraph} instance.
@@ -64,6 +57,14 @@ public interface SubmittedJobGraphStore {
 	void removeJobGraph(JobID jobId) throws Exception;
 
 	/**
+	 * Get all job ids of submitted job graphs to the submitted job graph store.
+	 *
+	 * @return Collection of submitted job ids
+	 * @throws Exception if the operation fails
+	 */
+	Collection<JobID> getJobIds() throws Exception;
+
+	/**
 	 * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
 	 * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7324c07..859d319 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -24,18 +24,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -156,73 +153,41 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 	}
 
 	@Override
-	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		String path = getPathForJob(jobId);
+
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
-			LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
-
-			List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
-
-			while (true) {
-				try {
-					submitted = jobGraphsInZooKeeper.getAll();
-					break;
-				}
-				catch (ConcurrentModificationException e) {
-					LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
-				}
-			}
-
-			LOG.info("Found {} job graphs.", submitted.size());
-
-			if (submitted.size() != 0) {
-				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
-
-				for (Tuple2<StateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted)
{
-					SubmittedJobGraph jobGraph = jobStateHandle
-							.f0.getState(ClassLoader.getSystemClassLoader());
+			StateHandle<SubmittedJobGraph> submittedJobStateHandle;
 
-					addedJobGraphs.add(jobGraph.getJobId());
 
-					jobGraphs.add(jobGraph);
-				}
-
-				LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs);
-				return jobGraphs;
-			}
-			else {
-				LOG.info("No job graph to recover.");
-				return Collections.emptyList();
+			try {
+				submittedJobStateHandle = jobGraphsInZooKeeper.get(path);
+			} catch (KeeperException.NoNodeException ignored) {
+				return null;
+			} catch (Exception e) {
+				throw new Exception("Could not retrieve the submitted job graph state handle " +
+					"for " + path + "from the submitted job graph store.", e);
 			}
-		}
-	}
-
-	@Override
-	public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
-		checkNotNull(jobId, "Job ID");
-		String path = getPathForJob(jobId);
+			
+			SubmittedJobGraph jobGraph;
 
-		LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+			LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
 
-		synchronized (cacheLock) {
-			verifyIsRunning();
 
 			try {
-				StateHandle<SubmittedJobGraph> jobStateHandle = jobGraphsInZooKeeper.get(path);
-
-				SubmittedJobGraph jobGraph = jobStateHandle
-						.getState(ClassLoader.getSystemClassLoader());
+				jobGraph = submittedJobStateHandle.getState(getClass().getClassLoader());
+			} catch (Exception e) {
+				throw new Exception("Failed to retrieve the submitted job graph from state handle.",
e);
+			}
 
-				addedJobGraphs.add(jobGraph.getJobId());
+			addedJobGraphs.add(jobGraph.getJobId());
 
-				LOG.info("Recovered {}.", jobGraph);
+			LOG.info("Recovered {}.", jobGraph);
 
-				return Option.apply(jobGraph);
-			}
-			catch (KeeperException.NoNodeException ignored) {
-				return Option.empty();
-			}
+			return jobGraph;
 		}
 	}
 
@@ -290,6 +255,29 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
+	@Override
+	public Collection<JobID> getJobIds() throws Exception {
+		Collection<String> paths;
+
+		try {
+			paths = jobGraphsInZooKeeper.getAllPaths();
+		} catch (Exception e) {
+			throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.",
e);
+		}
+
+		List<JobID> jobIds = new ArrayList<>(paths.size());
+
+		for (String path : paths) {
+			try {
+				jobIds.add(jobIdfromPath(path));
+			} catch (Exception exception) {
+				LOG.warn("Could not parse job id from {}.", path, exception);
+			}
+		}
+
+		return jobIds;
+	}
+
 	/**
 	 * Monitors ZooKeeper for changes.
 	 *
@@ -412,4 +400,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 		return String.format("/%s", jobId);
 	}
 
+	/**
+	 * Returns the JobID from the given path in ZooKeeper.
+	 *
+	 * @param path in ZooKeeper
+	 * @return JobID associated with the given path
+	 */
+	public static JobID jobIdfromPath(final String path) {
+		return JobID.fromHexString(path);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 6576ff8..0d63a15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,8 +30,11 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -226,10 +229,45 @@ public class ZooKeeperStateHandleStore<T extends Serializable>
{
 	public StateHandle<T> get(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-		byte[] data = client.getData().forPath(pathInZooKeeper);
+		byte[] data;
 
-		return (StateHandle<T>) InstantiationUtil
-				.deserializeObject(data, ClassLoader.getSystemClassLoader());
+		try {
+			data = client.getData().forPath(pathInZooKeeper);
+		} catch (Exception e) {
+			throw new Exception("Failed to retrieve state handle data under " + pathInZooKeeper +
+				" from ZooKeeper.", e);
+		}
+
+		try {
+			return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new Exception("Failed to deserialize state handle from ZooKeeper data from " +
+				pathInZooKeeper + '.', e);
+		}
+	}
+
+	/**
+	 * Return a list of all valid paths for state handles.
+	 *
+	 * @return List of valid state handle paths in ZooKeeper
+	 * @throws Exception if a ZooKeeper operation fails
+	 */
+	public Collection<String> getAllPaths() throws Exception {
+		final String path = "/";
+
+		while(true) {
+			Stat stat = client.checkExists().forPath(path);
+
+			if (stat == null) {
+				return Collections.emptyList();
+			} else {
+				try {
+					return client.getChildren().forPath(path);
+				} catch (KeeperException.NoNodeException ignored) {
+					// Concurrent deletion, retry
+				}
+			}
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9061db4..9f6e2db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -471,6 +471,8 @@ class JobManager(
 
     case RecoverSubmittedJob(submittedJobGraph) =>
       if (!currentJobs.contains(submittedJobGraph.getJobId)) {
+        log.info(s"Submitting recovered job ${submittedJobGraph.getJobId}.")
+
         submitJob(
           submittedJobGraph.getJobGraph(),
           submittedJobGraph.getJobInfo(),
@@ -492,7 +494,7 @@ class JobManager(
             log.info(s"Attempting to recover job $jobId.")
             val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId)
 
-            submittedJobGraphOption match {
+            Option(submittedJobGraphOption) match {
               case Some(submittedJobGraph) =>
                 if (!leaderElectionService.hasLeadership()) {
                   // we've lost leadership. mission: abort.
@@ -505,37 +507,31 @@ class JobManager(
             }
           }
         } catch {
-          case t: Throwable => log.error(s"Failed to recover job $jobId.", t)
+          case t: Throwable => log.warn(s"Failed to recover job $jobId.", t)
         }
       }(context.dispatcher)
 
     case RecoverAllJobs =>
       future {
-        try {
-          // The ActorRef, which is part of the submitted job graph can only be
-          // de-serialized in the scope of an actor system.
-          akka.serialization.JavaSerializer.currentSystem.withValue(
-            context.system.asInstanceOf[ExtendedActorSystem]) {
+        log.info("Attempting to recover all jobs.")
 
-            log.info(s"Attempting to recover all jobs.")
-
-            val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
+        try {
+          val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala
 
-            if (!leaderElectionService.hasLeadership()) {
-              // we've lost leadership. mission: abort.
-              log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size}
" +
-                s"jobs.")
-            } else {
-              log.info(s"Re-submitting ${jobGraphs.size} job graphs.")
+          if (jobIdsToRecover.isEmpty) {
+            log.info("There are no jobs to recover.")
+          } else {
+            log.info(s"There are ${jobIdsToRecover.size} jobs to recover. Starting the job
" +
+                       s"recovery.")
 
-              jobGraphs.foreach{
-                submittedJobGraph =>
-                  self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
-              }
+            jobIdsToRecover foreach {
+              jobId => self ! decorateMessage(RecoverJob(jobId))
             }
           }
         } catch {
-          case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t)
+          case e: Exception =>
+            log.warn("Failed to recover job ids from submitted job graph store. Aborting
" +
+                       "recovery.", e)
         }
       }(context.dispatcher)
 
@@ -1039,7 +1035,12 @@ class JobManager(
    * @param jobInfo the job info
    * @param isRecovery Flag indicating whether this is a recovery or initial submission
    */
-  private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false):
Unit = {
+  private def submitJob(
+      jobGraph: JobGraph,
+      jobInfo: JobInfo,
+      isRecovery: Boolean = false)
+    : Unit = {
+
     if (jobGraph == null) {
       jobInfo.client ! decorateMessage(JobResultFailure(
         new SerializedThrowable(

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index b98f338..b78f1fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,8 +20,13 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.japi.pf.FI;
+import akka.japi.pf.ReceiveBuilder;
+import akka.pattern.Patterns;
+import akka.testkit.CallingThreadDispatcher;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
@@ -29,15 +34,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -49,10 +57,12 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -62,6 +72,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -69,25 +80,35 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import scala.Int;
 import scala.Option;
+import scala.PartialFunction;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
+import scala.runtime.BoxedUnit;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class JobManagerHARecoveryTest {
 
@@ -288,6 +309,134 @@ public class JobManagerHARecoveryTest {
 	}
 
 	/**
+	 * Tests that a failing job recovery won't cause other job recoveries to fail.
+	 */
+	@Test
+	public void testFailingJobRecovery() throws Exception {
+		final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+		final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
+		Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
+		final Configuration flinkConfiguration = new Configuration();
+		UUID leaderSessionID = UUID.randomUUID();
+		ActorRef jobManager = null;
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+
+		// set HA mode to zookeeper so that we try to recover jobs
+		flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+
+		try {
+			final SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+
+			SubmittedJobGraph submittedJobGraph = mock(SubmittedJobGraph.class);
+			when(submittedJobGraph.getJobId()).thenReturn(jobId2);
+
+			when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2));
+
+			// fail the first job recovery
+			when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new Exception("Test
exception"));
+			// succeed the second job recovery
+			when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph);
+
+			final TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
+
+			final Collection<JobID> recoveredJobs = new ArrayList<>(2);
+
+			Props jobManagerProps = Props.create(
+				TestingFailingHAJobManager.class,
+				flinkConfiguration,
+				TestExecutors.directExecutor(),
+				TestExecutors.directExecutor(),
+				mock(InstanceManager.class),
+				mock(Scheduler.class),
+				new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+				ActorRef.noSender(),
+				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+				timeout,
+				myLeaderElectionService,
+				submittedJobGraphStore,
+				mock(CheckpointRecoveryFactory.class),
+				mock(SavepointStore.class),
+				jobRecoveryTimeout,
+				Option.<MetricRegistry>apply(null),
+				recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
+
+			jobManager = system.actorOf(jobManagerProps, "jobmanager");
+
+			Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis());
+
+			Await.ready(started, deadline.timeLeft());
+
+			// make the job manager the leader --> this triggers the recovery of all jobs
+			myLeaderElectionService.isLeader(leaderSessionID);
+
+			// check that we have successfully recovered the second job
+			assertThat(recoveredJobs, containsInAnyOrder(jobId2));
+		} finally {
+			TestingUtils.stopActor(jobManager);
+		}
+	}
+
+	static class TestingFailingHAJobManager extends JobManager {
+
+		private final Collection<JobID> recoveredJobs;
+
+		public TestingFailingHAJobManager(
+			Configuration flinkConfiguration,
+			Executor futureExecutor,
+			Executor ioExecutor,
+			InstanceManager instanceManager,
+			Scheduler scheduler,
+			BlobLibraryCacheManager libraryCacheManager,
+			ActorRef archive,
+			RestartStrategyFactory restartStrategyFactory,
+			FiniteDuration timeout,
+			LeaderElectionService leaderElectionService,
+			SubmittedJobGraphStore submittedJobGraphs,
+			CheckpointRecoveryFactory checkpointRecoveryFactory,
+			SavepointStore savepointStore,
+			FiniteDuration jobRecoveryTimeout,
+			Option<MetricRegistry> metricRegistry,
+			Collection<JobID> recoveredJobs) {
+			super(
+				flinkConfiguration,
+				futureExecutor,
+				ioExecutor,
+				instanceManager,
+				scheduler,
+				libraryCacheManager,
+				archive,
+				restartStrategyFactory,
+				timeout,
+				leaderElectionService,
+				submittedJobGraphs,
+				checkpointRecoveryFactory,
+				savepointStore,
+				jobRecoveryTimeout,
+				metricRegistry);
+
+			this.recoveredJobs = recoveredJobs;
+		}
+
+		@Override
+		public PartialFunction<Object, BoxedUnit> handleMessage() {
+			return ReceiveBuilder.match(
+				JobManagerMessages.RecoverSubmittedJob.class,
+				new FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>() {
+					@Override
+					public void apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception
{
+						recoveredJobs.add(submitJob.submittedJobGraph().getJobId());
+					}
+				}).matchAny(new FI.UnitApply<Object>() {
+				@Override
+				public void apply(Object o) throws Exception {
+					TestingFailingHAJobManager.super.handleMessage().apply(o);
+				}
+			}).build();
+		}
+	}
+
+	/**
 	 * A checkpoint store, which supports shutdown and suspend. You can use this to test HA
 	 * as long as the factory always returns the same store instance.
 	 */
@@ -383,16 +532,11 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
-			return new ArrayList<>(storedJobs.values());
-		}
-
-		@Override
-		public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+		public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 			if (storedJobs.containsKey(jobId)) {
-				return Option.apply(storedJobs.get(jobId));
+				return storedJobs.get(jobId);
 			} else {
-				return Option.apply(null);
+				return null;
 			}
 		}
 
@@ -406,6 +550,11 @@ public class JobManagerHARecoveryTest {
 			storedJobs.remove(jobId);
 		}
 
+		@Override
+		public Collection<JobID> getJobIds() throws Exception {
+			return storedJobs.keySet();
+		}
+
 		boolean contains(JobID jobId) {
 			return storedJobs.containsKey(jobId);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
index 8ebb7f8..079a10e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -19,14 +19,13 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 
 public class StandaloneSubmittedJobGraphStoreTest {
 
@@ -41,14 +40,14 @@ public class StandaloneSubmittedJobGraphStoreTest {
 				new JobGraph("testNoOps"),
 				new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
 
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
 		jobGraphs.putJobGraph(jobGraph);
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
 		jobGraphs.removeJobGraph(jobGraph.getJobGraph().getJobID());
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
-		assertTrue(jobGraphs.recoverJobGraph(new JobID()).isEmpty());
+		assertNull(jobGraphs.recoverJobGraph(new JobID()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 8eaecd0..cc9e815 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -36,8 +36,8 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
@@ -93,32 +93,36 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger
{
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Add initial
 			jobGraphs.putJobGraph(jobGraph);
 
 			// Verify initial job graph
-			List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
-			assertEquals(1, actual.size());
+			Collection<JobID> jobIds = jobGraphs.getJobIds();
+			assertEquals(1, jobIds.size());
 
-			verifyJobGraphs(jobGraph, actual.get(0));
+			JobID jobId = jobIds.iterator().next();
+
+			verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
 
 			// Update (same ID)
 			jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1);
 			jobGraphs.putJobGraph(jobGraph);
 
 			// Verify updated
-			actual = jobGraphs.recoverJobGraphs();
-			assertEquals(1, actual.size());
+			jobIds = jobGraphs.getJobIds();
+			assertEquals(1, jobIds.size());
+
+			jobId = jobIds.iterator().next();
 
-			verifyJobGraphs(jobGraph, actual.get(0));
+			verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
 
 			// Remove
 			jobGraphs.removeJobGraph(jobGraph.getJobId());
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Nothing should have been notified
 			verify(listener, atMost(1)).onAddedJobGraph(any(JobID.class));
@@ -154,11 +158,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger
{
 				jobGraphs.putJobGraph(jobGraph);
 			}
 
-			List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
+			Collection<JobID> actual = jobGraphs.getJobIds();
 
 			assertEquals(expected.size(), actual.size());
 
-			for (SubmittedJobGraph jobGraph : actual) {
+			for (JobID jobId : actual) {
+				SubmittedJobGraph jobGraph = jobGraphs.recoverJobGraph(jobId);
 				assertTrue(expected.containsKey(jobGraph.getJobId()));
 
 				verifyJobGraphs(expected.get(jobGraph.getJobId()), jobGraph);
@@ -167,7 +172,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger
{
 			}
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Nothing should have been notified
 			verify(listener, atMost(expected.size())).onAddedJobGraph(any(JobID.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 262f78a..e598ac5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -364,7 +364,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				nonLeadingJobManagerProcess = jobManagerProcess[0];
 			}
 
-			// BLocking JobGraph
+			// Blocking JobGraph
 			JobVertex blockingVertex = new JobVertex("Blocking vertex");
 			blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
 			JobGraph jobGraph = new JobGraph(blockingVertex);
@@ -393,7 +393,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				String output = nonLeadingJobManagerProcess.getProcessOutput();
 
 				if (output != null) {
-					if (output.contains("Fatal error: Failed to recover jobs") &&
+					if (output.contains("Failed to recover job") &&
 							output.contains("java.io.FileNotFoundException")) {
 
 						success = true;


Mime
View raw message