flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [26/47] flink git commit: [FLINK-2804] [runtime] Add blocking job submission support for HA
Date Tue, 20 Oct 2015 07:59:17 GMT
[FLINK-2804] [runtime] Add blocking job submission support for HA

The JobClientActor is now repsonsible for receiving the JobStatus updates from
a newly elected leader. It uses the LeaderRetrievalService to be notified about
new leaders. The actor can only be used to submit a single job to the JM. Once
it received a job from the Client it tries to send it to the current leader.
If no leader is available, a connection timeout is triggered. If the job could
be sent to the JM, a submission timeout is triggered if the JobClientActor does
not receive a JobSubmitSuccess message within the timeout interval. If the
connection to the leader is lost after having submitted a job, a connection
timeout is triggered if the JobClientActor cannot reconnect to another JM within
the timeout interval. The JobClient simply awaits on the completion of the
returned future to the SubmitJobAndWait message.

Added test cases for JobClientActor exceptions

This closes #1249.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d18f5809
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d18f5809
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d18f5809

Branch: refs/heads/master
Commit: d18f5809808221f6bd7c045aa78dfe01dab2afdf
Parents: c3a4d1d
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Oct 8 01:52:07 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Oct 20 00:16:52 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java | 108 +++---
 .../RemoteExecutorHostnameResolutionTest.java   |   6 +-
 .../apache/flink/runtime/client/JobClient.java  |  50 +--
 .../flink/runtime/client/JobClientActor.java    | 360 +++++++++++++++----
 ...obClientActorConnectionTimeoutException.java |  35 ++
 ...obClientActorSubmissionTimeoutException.java |  35 ++
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +
 .../runtime/messages/JobClientMessages.scala    |  26 +-
 .../runtime/messages/JobManagerMessages.scala   |   6 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  24 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   7 +-
 .../client/JobClientActorRecoveryITCase.java    | 163 +++++++++
 .../runtime/client/JobClientActorTest.java      | 228 ++++++++++++
 .../PartialConsumePipelinedResultTest.java      |  13 +-
 .../JobManagerLeaderElectionTest.java           |  25 +-
 .../LeaderChangeStateCleanupTest.java           |   9 +-
 .../LeaderElectionRetrievalTestingCluster.java  |  25 +-
 .../TestingLeaderElectionService.java           |   5 +-
 .../TestingLeaderRetrievalService.java          |  16 +
 .../testingUtils/TestingJobManagerLike.scala    |  15 +
 .../TestingJobManagerMessages.scala             |   9 +-
 .../ZooKeeperLeaderElectionITCase.java          | 117 +++---
 .../src/test/resources/log4j-test.properties    |   7 +-
 23 files changed, 1008 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index dfb9c1b..322c73d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -24,8 +24,8 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -47,7 +47,6 @@ import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -81,12 +80,15 @@ public class Client {
 	/** The actor system used to communicate with the JobManager */
 	private final ActorSystem actorSystem;
 
-	/** The actor reference to the JobManager */
-	private final ActorGateway jobManagerGateway;
+	/** Configuration of the client */
+	private final Configuration config;
 
-	/** The timeout for communication between the client and the JobManager */
+	/** Timeout for futures */
 	private final FiniteDuration timeout;
-	
+
+	/** Lookup timeout for the job manager retrieval service */
+	private final FiniteDuration lookupTimeout;
+
 	/**
 	 * If != -1, this field specifies the total number of available slots on the cluster
 	 * connected to the client.
@@ -133,6 +135,7 @@ public class Client {
 	 */
 	public Client(Configuration config, int maxSlots) throws IOException {
 
+		this.config = Preconditions.checkNotNull(config);
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
 		this.maxSlots = maxSlots;
 
@@ -144,50 +147,8 @@ public class Client {
 			throw new IOException("Could start client actor system.", e);
 		}
 
-		// from here on, we need to make sure the actor system is shut down on error
-		boolean success = false;
-
-		try {
-
-			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(config);
-			this.timeout = AkkaUtils.getTimeout(config);
-
-			LOG.info("Looking up JobManager");
-			LeaderRetrievalService leaderRetrievalService;
-
-			try {
-				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
-			} catch (Exception e) {
-				throw new IOException("Could not create the leader retrieval service.", e);
-			}
-
-			try {
-				this.jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-						leaderRetrievalService,
-						actorSystem,
-						lookupTimeout);
-			} catch (LeaderRetrievalException e) {
-				throw new IOException("Failed to retrieve JobManager gateway", e);
-			}
-
-			LOG.info("Leading JobManager actor system address is " + this.jobManagerGateway.path());
-
-			LOG.info("JobManager runs at " + this.jobManagerGateway.path());
-
-			LOG.info("Communication between client and JobManager will have a timeout of " + this.timeout);
-			success = true;
-		} finally {
-			if (!success) {
-				try {
-					this.actorSystem.shutdown();
-
-					// wait at most for 30 seconds, to work around an occasional akka problem
-					actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
-				} catch (Throwable t) {
-					LOG.error("Shutting down actor system after error caused another error", t);
-				}
-			}
-		}
+		timeout = AkkaUtils.getTimeout(config);
+		lookupTimeout = AkkaUtils.getTimeout(config);
 	}
 	// ------------------------------------------------------------------------
 	//  Startup & Shutdown
@@ -395,21 +356,30 @@ public class Client {
 	}
 
 	public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		LOG.info("Checking and uploading JAR files");
+		LeaderRetrievalService leaderRetrievalService;
 		try {
-			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
-		} catch (IOException e) {
-			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+		} catch (Exception e) {
+			throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
 		}
+
 		try {
 			this.lastJobID = jobGraph.getJobID();
-			return JobClient.submitJobAndWait(actorSystem, jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, classLoader);
+			return JobClient.submitJobAndWait(actorSystem, leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
 		} catch (JobExecutionException e) {
 			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
 		}
 	}
 
 	public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		ActorGateway jobManagerGateway;
+
+		try {
+			jobManagerGateway = getJobManagerGateway();
+		} catch (Exception e) {
+			throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
+		}
+
 		LOG.info("Checking and uploading JAR files");
 		try {
 			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
@@ -432,6 +402,8 @@ public class Client {
 	 * @throws Exception In case an error occurred.
 	 */
 	public void cancel(JobID jobId) throws Exception {
+		ActorGateway jobManagerGateway = getJobManagerGateway();
+
 		Future<Object> response;
 		try {
 			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
@@ -471,6 +443,7 @@ public class Client {
 	 * @return A Map containing the accumulator's name and its value.
 	 */
 	public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
+		ActorGateway jobManagerGateway = getJobManagerGateway();
 
 		Future<Object> response;
 		try {
@@ -520,6 +493,8 @@ public class Client {
 		if (jobIds == null) {
 			throw new IllegalArgumentException("The JobIDs must not be null");
 		}
+
+		ActorGateway jobManagerGateway = getJobManagerGateway();
 		
 		for (JobID jid : jobIds) {
 			if (jid != null) {
@@ -572,4 +547,27 @@ public class Client {
 		return job;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Helper methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the {@link ActorGateway} of the current job manager leader using
+	 * the {@link LeaderRetrievalService}.
+	 *
+	 * @return ActorGateway of the current job manager leader
+	 * @throws Exception
+	 */
+	private ActorGateway getJobManagerGateway() throws Exception {
+		LOG.info("Looking up JobManager");
+		LeaderRetrievalService leaderRetrievalService;
+
+		leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+
+		return LeaderRetrievalUtils.retrieveLeaderGateway(
+			leaderRetrievalService,
+			actorSystem,
+			lookupTimeout);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index 424f72e..fb5200b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -21,10 +21,10 @@ package org.apache.flink.client;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
@@ -51,7 +51,7 @@ public class RemoteExecutorHostnameResolutionTest {
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}
-		catch (IOException e) {
+		catch (ProgramInvocationException e) {
 			// that is what we want!
 			assertTrue(e.getCause() instanceof UnknownHostException);
 		}
@@ -74,7 +74,7 @@ public class RemoteExecutorHostnameResolutionTest {
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}
-		catch (IOException e) {
+		catch (ProgramInvocationException e) {
 			// that is what we want!
 			assertTrue(e.getCause() instanceof UnknownHostException);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index a436881..0105632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -28,12 +28,12 @@ import akka.util.Timeout;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -52,7 +52,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -85,43 +84,14 @@ public class JobClient {
 	}
 
 	/**
-	 * Extracts the JobManager's Akka URL from the configuration. If localActorSystem is true, then
-	 * the JobClient is executed in the same actor system as the JobManager. Thus, they can
-	 * communicate locally.
-	 *
-	 * @param config Configuration object containing all user provided configuration values
-	 * @return The socket address of the JobManager actor system
-	 */
-	public static InetSocketAddress getJobManagerAddress(Configuration config) throws IOException {
-		String jobManagerAddress = config.getString(
-				ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-
-		int jobManagerRPCPort = config.getInteger(
-				ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-
-		if (jobManagerAddress == null) {
-			throw new RuntimeException(
-					"JobManager address has not been specified in the configuration.");
-		}
-
-		try {
-			return new InetSocketAddress(
-					InetAddress.getByName(jobManagerAddress), jobManagerRPCPort);
-		}
-		catch (UnknownHostException e) {
-			throw new IOException("Cannot resolve JobManager hostname " + jobManagerAddress, e);
-		}
-	}
-
-	/**
 	 * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to
 	 * the JobManager. The method blocks until the job has finished or the JobManager is no longer
 	 * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter
 	 * case a [[JobExecutionException]] is thrown.
 	 *
 	 * @param actorSystem The actor system that performs the communication.
-	 * @param jobManagerGateway  Gateway to the JobManager that should execute the job.
+	 * @param leaderRetrievalService Leader retrieval service which used to find the current leading
+	 *                               JobManager
 	 * @param jobGraph    JobGraph describing the Flink job
 	 * @param timeout     Timeout for futures
 	 * @param sysoutLogUpdates prints log updates to system out if true
@@ -131,14 +101,14 @@ public class JobClient {
 	 */
 	public static JobExecutionResult submitJobAndWait(
 			ActorSystem actorSystem,
-			ActorGateway jobManagerGateway,
+			LeaderRetrievalService leaderRetrievalService,
 			JobGraph jobGraph,
 			FiniteDuration timeout,
 			boolean sysoutLogUpdates,
 			ClassLoader classLoader) throws JobExecutionException {
 
 		checkNotNull(actorSystem, "The actorSystem must not be null.");
-		checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
+		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
 		checkNotNull(jobGraph, "The jobGraph must not be null.");
 		checkNotNull(timeout, "The timeout must not be null.");
 
@@ -146,12 +116,10 @@ public class JobClient {
 		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
 		// update messages, watches for disconnect between client and JobManager, ...
 
-		Props jobClientActorProps = Props.create(
-				JobClientActor.class,
-				jobManagerGateway.actor(),
-				LOG,
-				sysoutLogUpdates,
-				jobManagerGateway.leaderSessionID());
+		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
+			leaderRetrievalService,
+			timeout,
+			sysoutLogUpdates);
 
 		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index bf747c4..d08046b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -20,45 +20,93 @@ package org.apache.flink.runtime.client;
 
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
+import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Terminated;
+import akka.dispatch.Futures;
+import akka.dispatch.OnSuccess;
 import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobClientMessages.JobManagerActorRef;
+import org.apache.flink.runtime.messages.JobClientMessages.JobManagerLeaderAddress;
+import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.slf4j.Logger;
+import org.apache.flink.runtime.util.SerializedThrowable;
+import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 
 /**
  * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient
  * is used to submit jobs to the JobManager and to request the port of the BlobManager.
  */
-public class JobClientActor extends FlinkUntypedActor {
-	
-	private final ActorRef jobManager;
-	private final Logger logger;
+public class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener {
+
+	private final LeaderRetrievalService leaderRetrievalService;
+
+	/** timeout for futures */
+	private final FiniteDuration timeout;
+
+	/** true if status messages shall be printed to sysout */
 	private final boolean sysoutUpdates;
 
+	/** true if a SubmitJobSuccess message has been received */
+	private boolean jobSuccessfullySubmitted = false;
+
+	/** true if a PoisonPill was taken */
+	private boolean terminated = false;
+
+	/** ActorRef to the current leader */
+	private ActorRef jobManager;
+
 	/** leader session ID of the JobManager when this actor was created */
-	private final UUID leaderSessionID;
+	private UUID leaderSessionID;
 
 	/** Actor which submits a job to the JobManager via this actor */
 	private ActorRef submitter;
 
-	public JobClientActor(ActorRef jobManager, Logger logger, boolean sysoutUpdates,
-			UUID leaderSessionID) {
+	/** JobGraph which shall be submitted to the JobManager */
+	private JobGraph jobGraph;
 
-		this.jobManager = Preconditions.checkNotNull(jobManager, "The JobManager ActorRef must not be null.");
-		this.logger = Preconditions.checkNotNull(logger, "The logger must not be null.");
-
-		this.leaderSessionID = leaderSessionID;
+	public JobClientActor(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration submissionTimeout,
+			boolean sysoutUpdates) {
+		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
+		this.timeout = Preconditions.checkNotNull(submissionTimeout);
 		this.sysoutUpdates = sysoutUpdates;
 	}
-	
+
+	@Override
+	public void preStart() {
+		try {
+			leaderRetrievalService.start(this);
+		} catch (Exception e) {
+			LOG.error("Could not start the leader retrieval service.");
+			throw new RuntimeException("Could not start the leader retrieval service.", e);
+		}
+	}
+
+	@Override
+	public void postStop() {
+		try {
+			leaderRetrievalService.stop();
+		} catch (Exception e) {
+			LOG.warn("Could not properly stop the leader retrieval service.");
+		}
+	}
+
 	@Override
 	protected void handleMessage(Object message) {
 		
@@ -66,50 +114,79 @@ public class JobClientActor extends FlinkUntypedActor {
 
 		if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
 			logAndPrintMessage(message);
-		}
-		else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
+		} else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
 			logAndPrintMessage(message);
 		}
 
+		// ============ JobManager ActorRef resolution ===============
+
+		else if (message instanceof JobManagerLeaderAddress) {
+			JobManagerLeaderAddress msg = (JobManagerLeaderAddress) message;
+
+			disconnectFromJobManager();
+
+			this.leaderSessionID = msg.leaderSessionID();
+
+			if (msg.address() != null) {
+				// Resolve the job manager leader address to obtain an ActorRef
+				AkkaUtils.getActorRefFuture(msg.address(), getContext().system(), timeout)
+					.onSuccess(new OnSuccess<ActorRef>() {
+						@Override
+						public void onSuccess(ActorRef result) throws Throwable {
+							getSelf().tell(decorateMessage(new JobManagerActorRef(result)), ActorRef.noSender());
+						}
+					}, getContext().dispatcher());
+			}
+		} else if (message instanceof JobManagerActorRef) {
+			// Resolved JobManager ActorRef
+			JobManagerActorRef msg = (JobManagerActorRef) message;
+			connectToJobManager(msg.jobManager());
+
+			if (jobGraph != null && !jobSuccessfullySubmitted) {
+				// if we haven't yet submitted the job successfully
+				tryToSubmitJob(jobGraph);
+			}
+		}
+
 		// =========== Job Life Cycle Messages ===============
 		
 		// submit a job to the JobManager
-		else if (message instanceof JobClientMessages.SubmitJobAndWait) {
-			// sanity check that this no job was submitted through this actor before -
-			// it is a one-shot actor after all
-			if (this.submitter == null) {
-				JobGraph jobGraph = ((JobClientMessages.SubmitJobAndWait) message).jobGraph();
-				if (jobGraph == null) {
-					logger.error("Received null JobGraph");
-					sender().tell(
+		else if (message instanceof SubmitJobAndWait) {
+			// only accept SubmitJobWait messages if we're not about to terminate
+			if (!terminated) {
+				// sanity check that this no job was submitted through this actor before -
+				// it is a one-shot actor after all
+				if (this.submitter == null) {
+					jobGraph = ((SubmitJobAndWait) message).jobGraph();
+					if (jobGraph == null) {
+						LOG.error("Received null JobGraph");
+						sender().tell(
 							decorateMessage(new Status.Failure(new Exception("JobGraph is null"))),
 							getSelf());
-				}
-				else {
-					logger.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress",
-							jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
+					} else {
+						LOG.info("Received job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
 
-					this.submitter = getSender();
-					jobManager.tell(
-						decorateMessage(
-							new JobManagerMessages.SubmitJob(
-								jobGraph,
-								ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
-							getSelf());
-					
-					// make sure we notify the sender when the connection got lost
-					getContext().watch(jobManager);
+						this.submitter = getSender();
+
+						// is only successful if we already know the job manager leader
+						tryToSubmitJob(jobGraph);
+					}
+				} else {
+					// repeated submission - tell failure to sender and kill self
+					String msg = "Received repeated 'SubmitJobAndWait'";
+					LOG.error(msg);
+					getSender().tell(
+						decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
+
+					terminate();
 				}
-			}
-			else {
-				// repeated submission - tell failure to sender and kill self
-				String msg = "Received repeated 'SubmitJobAndWait'";
-				logger.error(msg);
+			} else {
+				// we're about to receive a PoisonPill because terminated == true
+				String msg = getClass().getName() + " is about to be terminated. Therefore, the " +
+					"job submission cannot be executed.";
+				LOG.error(msg);
 				getSender().tell(
 					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
-
-				getContext().unwatch(jobManager);
-				getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
 			}
 		}
 		// acknowledgement to submit job is only logged, our original
@@ -117,41 +194,80 @@ public class JobClientActor extends FlinkUntypedActor {
 		else if (message instanceof JobManagerMessages.JobResultSuccess ||
 				message instanceof JobManagerMessages.JobResultFailure) {
 			
-			if (logger.isDebugEnabled()) {
-				logger.debug("Received {} message from JobManager", message.getClass().getSimpleName());
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Received {} message from JobManager", message.getClass().getSimpleName());
 			}
 
 			// forward the success to the original job submitter
-			if (this.submitter != null) {
+			if (hasJobBeenSubmitted()) {
 				this.submitter.tell(decorateMessage(message), getSelf());
 			}
-			
-			// we are done, stop ourselves
-			getContext().unwatch(jobManager);
-			getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
+
+			terminate();
 		}
 		else if (message instanceof JobManagerMessages.JobSubmitSuccess) {
 			// job was successfully submitted :-)
-			logger.info("Job was successfully submitted to the JobManager");
+			LOG.info("Job was successfully submitted to the JobManager {}.", getSender().path());
+			jobSuccessfullySubmitted = true;
 		}
 
-		// =========== Actor / Communication Failure ===============
+		// =========== Actor / Communication Failure / Timeouts ===============
 		
 		else if (message instanceof Terminated) {
 			ActorRef target = ((Terminated) message).getActor();
 			if (jobManager.equals(target)) {
-				String msg = "Lost connection to JobManager " + jobManager.path();
-				logger.info(msg);
-				submitter.tell(decorateMessage(new Status.Failure(new Exception(msg))), getSelf());
+				LOG.info("Lost connection to JobManager {}. Triggering connection timeout.",
+					jobManager.path());
+				disconnectFromJobManager();
+
+				// we only issue a connection timeout if we have submitted a job before
+				// otherwise, we might have some more time to find another job manager
+				// Important: The ConnectionTimeout message is filtered out in case that we are
+				// notified about a new leader by setting the new leader session ID, because
+				// ConnectionTimeout extends RequiresLeaderSessionID
+				if (hasJobBeenSubmitted()) {
+					getContext().system().scheduler().scheduleOnce(
+						timeout,
+						getSelf(),
+						decorateMessage(JobClientMessages.getConnectionTimeout()),
+						getContext().dispatcher(),
+						ActorRef.noSender());
+				}
 			} else {
-				logger.error("Received 'Terminated' for unknown actor " + target);
+				LOG.warn("Received 'Terminated' for unknown actor " + target);
+			}
+		} else if (JobClientMessages.getConnectionTimeout().equals(message)) {
+			// check if we haven't found a job manager yet
+			if (!isConnected()) {
+				if (hasJobBeenSubmitted()) {
+					submitter.tell(
+						decorateMessage(new Status.Failure(
+							new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."))),
+						getSelf());
+				}
+				// Connection timeout reached, let's terminate
+				terminate();
+			}
+		} else if (JobClientMessages.getSubmissionTimeout().equals(message)) {
+			// check if our job submission was successful in the meantime
+			if (!jobSuccessfullySubmitted) {
+				if (hasJobBeenSubmitted()) {
+					submitter.tell(
+						decorateMessage(new Status.Failure(
+							new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out."))),
+						getSelf());
+				}
+
+				// We haven't heard back from the job manager after sending the job graph to him,
+				// therefore terminate
+				terminate();
 			}
 		}
 
 		// =========== Unknown Messages ===============
 		
 		else {
-			logger.error("JobClient received unknown message: " + message);
+			LOG.error("JobClient received unknown message: " + message);
 		}
 	}
 
@@ -161,9 +277,133 @@ public class JobClientActor extends FlinkUntypedActor {
 	}
 
 	private void logAndPrintMessage(Object message) {
-		logger.info(message.toString());
+		LOG.info(message.toString());
 		if (sysoutUpdates) {
 			System.out.println(message.toString());
 		}
 	}
+
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+		getSelf().tell(
+			decorateMessage(new JobManagerLeaderAddress(leaderAddress, leaderSessionID)),
+			getSelf());
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		LOG.error("Error occurred in the LeaderRetrievalService.", exception);
+		getSelf().tell(decorateMessage(PoisonPill.getInstance()), getSelf());
+	}
+
+	private void disconnectFromJobManager() {
+		if (jobManager != ActorRef.noSender()) {
+			getContext().unwatch(jobManager);
+			jobManager = ActorRef.noSender();
+		}
+	}
+
+	private void connectToJobManager(ActorRef jobManager) {
+		if (jobManager != ActorRef.noSender()) {
+			getContext().unwatch(jobManager);
+		}
+
+		LOG.info("Connected to new JobManager {}.", jobManager.path());
+
+		this.jobManager = jobManager;
+		getContext().watch(jobManager);
+	}
+
+	private void tryToSubmitJob(final JobGraph jobGraph) {
+		this.jobGraph = jobGraph;
+
+		if (isConnected()) {
+			LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress",
+				jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
+
+			Futures.future(new Callable<Object>() {
+				@Override
+				public Object call() throws Exception {
+					ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+
+					LOG.info("Upload jar files to job manager {}.", jobManager.path());
+
+					try {
+						JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+					} catch (IOException exception) {
+						getSelf().tell(
+							decorateMessage(new JobManagerMessages.JobResultFailure(
+								new SerializedThrowable(
+									new JobSubmissionException(
+										jobGraph.getJobID(),
+										"Could not upload the jar files to the job manager.",
+										exception)
+								)
+							)),
+							ActorRef.noSender()
+						);
+					}
+
+					LOG.info("Submit job to the job manager {}.", jobManager.path());
+
+					jobManager.tell(
+						decorateMessage(
+							new JobManagerMessages.SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+						getSelf());
+
+					// issue a SubmissionTimeout message to check that we submit the job within
+					// the given timeout
+					getContext().system().scheduler().scheduleOnce(
+						timeout,
+						getSelf(),
+						decorateMessage(JobClientMessages.getSubmissionTimeout()),
+						getContext().dispatcher(),
+						ActorRef.noSender());
+
+					return null;
+				}
+			}, getContext().dispatcher());
+		} else {
+			LOG.info("Could not submit job {} ({}), because there is no connection to a " +
+					"JobManager.",
+				jobGraph.getName(), jobGraph.getJobID());
+
+			// We want to submit a job, but we haven't found a job manager yet.
+			// Let's give him another chance to find a job manager within the given timeout.
+			getContext().system().scheduler().scheduleOnce(
+				timeout,
+				getSelf(),
+				decorateMessage(JobClientMessages.getConnectionTimeout()),
+				getContext().dispatcher(),
+				ActorRef.noSender()
+			);
+		}
+	}
+
+	private void terminate() {
+		terminated = true;
+		disconnectFromJobManager();
+		getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
+	}
+
+	private boolean isConnected() {
+		return jobManager != ActorRef.noSender();
+	}
+
+	private boolean hasJobBeenSubmitted() {
+		return submitter != ActorRef.noSender();
+	}
+
+	public static Props createJobClientActorProps(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout,
+			boolean sysoutUpdates) {
+		return Props.create(
+			JobClientActor.class,
+			leaderRetrievalService,
+			timeout,
+			sysoutUpdates);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorConnectionTimeoutException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorConnectionTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorConnectionTimeoutException.java
new file mode 100644
index 0000000..72a5658
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorConnectionTimeoutException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+/**
+ * Exception which is thrown when the {@link JobClientActor} wants to submit a job to
+ * the job manager but has not found one after a given timeout interval.
+ */
+public class JobClientActorConnectionTimeoutException extends Exception {
+	private static final long serialVersionUID = 2287747430528388637L;
+
+	public JobClientActorConnectionTimeoutException(String msg) {
+		super(msg);
+	}
+
+	public JobClientActorConnectionTimeoutException(String msg, Throwable cause) {
+		super(msg, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorSubmissionTimeoutException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorSubmissionTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorSubmissionTimeoutException.java
new file mode 100644
index 0000000..2d39462
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorSubmissionTimeoutException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+/**
+ * Exception which is thrown by the {@link JobClientActor} if it has not heard back from the job
+ * manager after it has submitted a job to it within a given timeout interval.
+ */
+public class JobClientActorSubmissionTimeoutException extends Exception {
+	private static final long serialVersionUID = 8762463142030454853L;
+
+	public JobClientActorSubmissionTimeoutException(String msg) {
+		super(msg);
+	}
+
+	public JobClientActorSubmissionTimeoutException(String msg, Throwable cause) {
+		super(msg, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index eef28d8..055274d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -370,6 +370,8 @@ class JobManager(
         akka.serialization.JavaSerializer.currentSystem.withValue(
           context.system.asInstanceOf[ExtendedActorSystem]) {
 
+          log.info(s"Recovering all jobs.")
+
           val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
 
           if (!leaderElectionService.hasLeadership()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
index ac37493..a60fa7a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.messages
 
+import java.util.UUID
+
+import akka.actor.ActorRef
 import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.util.SerializedThrowable
 
 /**
  * This object contains the [[org.apache.flink.runtime.client.JobClient]] specific messages
@@ -47,4 +49,26 @@ object JobClientMessages {
    * @param jobGraph The job to be executed.
    */
   case class SubmitJobDetached(jobGraph: JobGraph)
+
+  /** Notifies the JobClientActor about a new leader address and a leader session ID.
+    *
+    * @param address New leader address
+    * @param leaderSessionID New leader session ID
+    */
+  case class JobManagerLeaderAddress(address: String, leaderSessionID: UUID)
+
+  /** Notifies the JobClientActor about the ActorRef of the new leader.
+    *
+    * @param jobManager ActorRef of the new leader
+    */
+  case class JobManagerActorRef(jobManager: ActorRef) extends RequiresLeaderSessionID
+
+  /** Message which is triggered when the submission timeout has been reached. */
+  case object SubmissionTimeout extends RequiresLeaderSessionID
+
+  /** Messaeg which is triggered when the connection timeout has been reached. */
+  case object ConnectionTimeout extends RequiresLeaderSessionID
+
+  def getSubmissionTimeout(): AnyRef = SubmissionTimeout
+  def getConnectionTimeout(): AnyRef = ConnectionTimeout
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 8097bdc..8a4d27b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -84,7 +84,7 @@ object JobManagerMessages {
   /**
    * Triggers recovery of all available jobs.
    */
-  case class RecoverAllJobs() extends RequiresLeaderSessionID
+  case object RecoverAllJobs extends RequiresLeaderSessionID
 
   /**
    * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is
@@ -427,4 +427,8 @@ object JobManagerMessages {
   def getRequestArchive: AnyRef = {
     RequestArchive
   }
+
+  def getRecoverAllJobs: AnyRef = {
+    RecoverAllJobs
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 2df3437..9c4381e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener,
 StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtAnyJobManager
-import org.apache.flink.runtime.util.ZooKeeperUtils
+import org.apache.flink.runtime.util.{LeaderRetrievalUtils, StandaloneUtils, ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
 
 import org.slf4j.LoggerFactory
@@ -388,29 +388,29 @@ abstract class FlinkMiniCluster(
     : JobExecutionResult = {
     submitJobAndWait(jobGraph, printUpdates, timeout)
   }
+
+  def submitJobAndWait(
+    jobGraph: JobGraph,
+    printUpdates: Boolean,
+    timeout: FiniteDuration)
+  : JobExecutionResult = {
+    submitJobAndWait(jobGraph, printUpdates, timeout, createLeaderRetrievalService())
+  }
   
   @throws(classOf[JobExecutionException])
   def submitJobAndWait(
       jobGraph: JobGraph,
       printUpdates: Boolean,
-      timeout: FiniteDuration)
+      timeout: FiniteDuration,
+      leaderRetrievalService: LeaderRetrievalService)
     : JobExecutionResult = {
 
     val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID)
 
      try {
-       val jobManagerGateway = try {
-           getLeaderGateway(timeout)
-         } catch {
-           case e: Exception => throw new JobExecutionException(
-             jobGraph.getJobID,
-             "Could not retrieve leading job manager gateway.",
-             e)
-         }
-
      JobClient.submitJobAndWait(
        clientActorSystem,
-       jobManagerGateway,
+       leaderRetrievalService,
        jobGraph,
        timeout,
        printUpdates,

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index b28fb73..d9d9596 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1117,7 +1117,12 @@ class TaskManager(
 
     currentJobManager match {
       case Some(jm) =>
-        handleJobManagerDisconnect(jm, s"JobManager ${newJobManagerAkkaURL} was elected as leader.")
+        Option(newJobManagerAkkaURL) match {
+          case Some(newJMAkkaURL) =>
+            handleJobManagerDisconnect(jm, s"JobManager ${newJMAkkaURL} was elected as leader.")
+          case None =>
+            handleJobManagerDisconnect(jm, s"Old JobManager lost its leadership.")
+        }
       case None =>
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
new file mode 100644
index 0000000..a93a515
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.PoisonPill;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+
+public class JobClientActorRecoveryITCase extends TestLogger {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public static TestingServer zkServer;
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		zkServer = new TestingServer();
+
+		zkServer.start();
+	}
+
+	public static void teardown() throws Exception {
+		if (zkServer != null) {
+			zkServer.stop();
+			zkServer = null;
+		}
+	}
+
+	/**
+	 * Tests wether the JobClientActor can connect to a newly elected leading job manager to obtain
+	 * the JobExecutionResult. The submitted job blocks for the first execution attempt. The
+	 * leading job manager will be killed so that the second job manager will be elected as the
+	 * leader. The newly elected leader has to retrieve the checkpointed job from ZooKeeper
+	 * and continue its execution. This time, the job does not block and, thus, can be finished.
+	 * The execution result should be sent to the JobClientActor which originally submitted the
+	 * job.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testJobClientRecovery() throws Exception {
+		File rootFolder = tempFolder.getRoot();
+
+		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+			zkServer.getConnectString(),
+			rootFolder.getPath());
+
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+		final TestingCluster cluster = new TestingCluster(config);
+		cluster.start();
+
+		JobVertex blockingVertex = new JobVertex("Blocking Vertex");
+		blockingVertex.setInvokableClass(BlockingTask.class);
+		blockingVertex.setParallelism(1);
+		final JobGraph jobGraph = new JobGraph("Blocking Test Job", blockingVertex);
+		final Promise<JobExecutionResult> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+
+		try {
+			Thread submitter = new Thread(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						JobExecutionResult result = cluster.submitJobAndWait(jobGraph, false);
+						promise.success(result);
+					} catch (Exception e) {
+						promise.failure(e);
+					}
+				}
+			});
+
+			submitter.start();
+
+			synchronized (BlockingTask.waitLock) {
+				while (BlockingTask.HasBlockedExecution < 1 && deadline.hasTimeLeft()) {
+					BlockingTask.waitLock.wait(deadline.timeLeft().toMillis());
+				}
+			}
+
+			if (deadline.isOverdue()) {
+				Assert.fail("The job has not blocked within the given deadline.");
+			}
+
+			ActorGateway gateway = cluster.getLeaderGateway(deadline.timeLeft());
+
+			gateway.tell(TestingJobManagerMessages.getDisablePostStop());
+			gateway.tell(PoisonPill.getInstance());
+
+			// if the job fails then an exception is thrown here
+			Await.result(promise.future(), deadline.timeLeft());
+		} finally {
+			cluster.shutdown();
+		}
+	}
+
+	public static class BlockingTask extends AbstractInvokable {
+
+		private volatile static int BlockExecution = 1;
+		private volatile static int HasBlockedExecution = 0;
+		private static Object waitLock = new Object();
+
+		@Override
+		public void registerInputOutput() throws Exception {
+			// Nothing to do
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			if (BlockExecution > 0) {
+				BlockExecution--;
+
+				// Tell the test that it's OK to kill the leader
+				synchronized (waitLock) {
+					HasBlockedExecution++;
+					waitLock.notifyAll();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
new file mode 100644
index 0000000..00ad632
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public class JobClientActorTest extends TestLogger {
+
+	private static ActorSystem system;
+	private static JobGraph testJobGraph = new JobGraph("Test Job");
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+		system = null;
+	}
+
+	/** Tests that a {@link JobClientActorSubmissionTimeoutException} is thrown when the job cannot
+	 * be submitted by the JobClientActor. This is here the case, because the started JobManager
+	 * never replies to a SubmitJob message.
+	 *
+	 * @throws Exception
+	 */
+	@Test(expected=JobClientActorSubmissionTimeoutException.class)
+	public void testSubmissionTimeout() throws Exception {
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+		UUID leaderSessionID = UUID.randomUUID();
+
+		ActorRef jobManager = system.actorOf(
+			Props.create(
+				PlainActor.class,
+				leaderSessionID));
+
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+			jobManager.path().toString(),
+			leaderSessionID
+		);
+
+		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
+			testingLeaderRetrievalService,
+			jobClientActorTimeout,
+			false);
+
+		ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+
+		Future<Object> jobExecutionResult = Patterns.ask(
+			jobClientActor,
+			new JobClientMessages.SubmitJobAndWait(testJobGraph),
+			new Timeout(timeout));
+
+		Await.result(jobExecutionResult, timeout);
+	}
+
+	/** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
+	 * is thrown when the JobClientActor wants to submit a job but has not connected to a JobManager.
+	 *
+	 * @throws Exception
+	 */
+	@Test(expected=JobClientActorConnectionTimeoutException.class)
+	public void testConnectionTimeoutWithoutJobManager() throws Exception {
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+
+		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
+			testingLeaderRetrievalService,
+			jobClientActorTimeout,
+			false);
+
+		ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+		Future<Object> jobExecutionResult = Patterns.ask(
+			jobClientActor,
+			new JobClientMessages.SubmitJobAndWait(testJobGraph),
+			new Timeout(timeout));
+
+		Await.result(jobExecutionResult, timeout);
+	}
+
+	/** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
+	 * is thrown after a successful job submission if the JobManager dies.
+	 *
+	 * @throws Exception
+	 */
+	@Test(expected=JobClientActorConnectionTimeoutException.class)
+	public void testConnectionTimeoutAfterJobSubmission() throws Exception {
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+		UUID leaderSessionID = UUID.randomUUID();
+
+		ActorRef jobManager = system.actorOf(
+			Props.create(
+				JobAcceptingActor.class,
+				leaderSessionID));
+
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+			jobManager.path().toString(),
+			leaderSessionID
+		);
+
+		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
+			testingLeaderRetrievalService,
+			jobClientActorTimeout,
+			false);
+
+		ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+		Future<Object> jobExecutionResult = Patterns.ask(
+			jobClientActor,
+			new JobClientMessages.SubmitJobAndWait(testJobGraph),
+			new Timeout(timeout));
+
+		Future<Object> waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout));
+
+		Await.result(waitFuture, timeout);
+
+		jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+		Await.result(jobExecutionResult, timeout);
+	}
+
+	public static class PlainActor extends FlinkUntypedActor {
+
+		private final UUID leaderSessionID;
+
+		public PlainActor(UUID leaderSessionID) {
+			this.leaderSessionID = leaderSessionID;
+		}
+
+		@Override
+		protected void handleMessage(Object message) throws Exception {
+
+		}
+
+		@Override
+		protected UUID getLeaderSessionID() {
+			return leaderSessionID;
+		}
+	}
+
+	public static class JobAcceptingActor extends FlinkUntypedActor {
+		private final UUID leaderSessionID;
+		private boolean jobAccepted = false;
+		private ActorRef testFuture = ActorRef.noSender();
+
+		public JobAcceptingActor(UUID leaderSessionID) {
+			this.leaderSessionID = leaderSessionID;
+		}
+
+		@Override
+		protected void handleMessage(Object message) throws Exception {
+			if (message instanceof JobManagerMessages.SubmitJob) {
+				getSender().tell(
+					new JobManagerMessages.JobSubmitSuccess(((JobManagerMessages.SubmitJob) message).jobGraph().getJobID()),
+					getSelf());
+
+				jobAccepted = true;
+
+				if(testFuture != ActorRef.noSender()) {
+					testFuture.tell(Messages.getAcknowledge(), getSelf());
+				}
+			} else if (message instanceof RegisterTest) {
+				testFuture = getSender();
+
+				if (jobAccepted) {
+					testFuture.tell(Messages.getAcknowledge(), getSelf());
+				}
+			}
+		}
+
+		@Override
+		protected UUID getLeaderSessionID() {
+			return leaderSessionID;
+		}
+	}
+
+	public static class RegisterTest{}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 5753cde..aa03fe1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -46,7 +44,6 @@ public class PartialConsumePipelinedResultTest {
 	private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
 
 	private static TestingCluster flink;
-	private static ActorSystem jobClient;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
@@ -59,8 +56,6 @@ public class PartialConsumePipelinedResultTest {
 		flink = new TestingCluster(config, true);
 
 		flink.start();
-
-		jobClient = JobClient.startJobClientActorSystem(flink.configuration());
 	}
 
 	@AfterClass
@@ -102,13 +97,7 @@ public class PartialConsumePipelinedResultTest {
 		sender.setSlotSharingGroup(slotSharingGroup);
 		receiver.setSlotSharingGroup(slotSharingGroup);
 
-		JobClient.submitJobAndWait(
-				jobClient,
-				flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-				jobGraph,
-				TestingUtils.TESTING_DURATION(),
-				false,
-				this.getClass().getClassLoader());
+		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
 	}
 
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index bbd8fad..c804830 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -43,11 +43,14 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -56,6 +59,9 @@ import java.util.concurrent.TimeUnit;
 
 public class JobManagerLeaderElectionTest extends TestLogger {
 
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
 	private static ActorSystem actorSystem;
 	private static TestingServer testingServer;
 	private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
@@ -84,12 +90,10 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 	 */
 	@Test
 	public void testLeaderElection() throws Exception {
-		final Configuration configuration = new Configuration();
-
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		configuration.setString(
-			ConfigConstants.ZOOKEEPER_QUORUM_KEY,
-			testingServer.getConnectString());
+		final Configuration configuration = ZooKeeperTestUtils
+			.createZooKeeperRecoveryModeConfig(
+				testingServer.getConnectString(),
+				tempFolder.getRoot().getPath());
 
 		ActorRef jm = null;
 
@@ -115,12 +119,11 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 	 */
 	@Test
 	public void testLeaderReelection() throws Exception {
-		final Configuration configuration = new Configuration();
+		final Configuration configuration = ZooKeeperTestUtils
+			.createZooKeeperRecoveryModeConfig(
+				testingServer.getConnectString(),
+				tempFolder.getRoot().getPath());
 
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		configuration.setString(
-			ConfigConstants.ZOOKEEPER_QUORUM_KEY,
-			testingServer.getConnectString());
 
 		ActorRef jm;
 		ActorRef jm2 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 8dd380e..0b84474 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -129,7 +129,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
 		// try to resubmit now the non-blocking job, it should complete successfully
 		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
-		cluster.submitJobAndWait(job, false, timeout);
+		cluster.submitJobAndWait(job, false, timeout, new TestingLeaderRetrievalService(jm2.path(), jm2.leaderSessionID()));
 	}
 
 	/**
@@ -207,7 +207,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		UUID leaderSessionID = UUID.randomUUID();
 		UUID newLeaderSessionID = UUID.randomUUID();
 
-		FiniteDuration shortTimeout = new FiniteDuration(20, TimeUnit.SECONDS);
+		FiniteDuration shortTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
 		cluster.grantLeadership(0, leaderSessionID);
 		cluster.notifyRetrievalListeners(0, leaderSessionID);
@@ -244,10 +244,11 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
 		cluster.waitForTaskManagersToBeRegistered();
 
+		ActorGateway leaderGateway = cluster.getLeaderGateway(timeout);
+
 		// try to resubmit now the non-blocking job, it should complete successfully
 		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
-		cluster.submitJobAndWait(job, false, timeout);
-
+		cluster.submitJobAndWait(job, false, timeout, new TestingLeaderRetrievalService(leaderGateway.path(), leaderGateway.leaderSessionID()));
 	}
 
 	public JobGraph createBlockingJob(int parallelism) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index 5b63107..c83f548 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import scala.Option;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 
 /**
@@ -39,11 +41,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 	private final boolean useSingleActorSystem;
 	private final StreamingMode streamingMode;
 
-	public TestingLeaderElectionService[] leaderElectionServices;
-	public TestingLeaderRetrievalService[] leaderRetrievalServices;
-
-	private int leaderElectionServiceCounter = 0;
-	private int leaderRetrievalServiceCounter = 0;
+	public List<TestingLeaderElectionService> leaderElectionServices;
+	public List<TestingLeaderRetrievalService> leaderRetrievalServices;
 
 	private int leaderIndex = -1;
 
@@ -58,8 +57,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 		this.useSingleActorSystem = singleActorSystem;
 		this.streamingMode = streamingMode;
 
-		leaderElectionServices = new TestingLeaderElectionService[this.numJobManagers()];
-		leaderRetrievalServices = new TestingLeaderRetrievalService[this.numTaskManagers() + 1];
+		leaderElectionServices = new ArrayList<TestingLeaderElectionService>();
+		leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>();
 	}
 
 	@Override
@@ -79,18 +78,18 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 
 	@Override
 	public Option<LeaderElectionService> createLeaderElectionService() {
-		leaderElectionServices[leaderElectionServiceCounter] = new TestingLeaderElectionService();
+		leaderElectionServices.add(new TestingLeaderElectionService());
 
-		LeaderElectionService result = leaderElectionServices[leaderElectionServiceCounter++];
+		LeaderElectionService result = leaderElectionServices.get(leaderElectionServices.size() - 1);
 
 		return Option.apply(result);
 	}
 
 	@Override
 	public LeaderRetrievalService createLeaderRetrievalService() {
-		leaderRetrievalServices[leaderRetrievalServiceCounter] = new TestingLeaderRetrievalService();
+		leaderRetrievalServices.add(new TestingLeaderRetrievalService());
 
-		return leaderRetrievalServices[leaderRetrievalServiceCounter++];
+		return leaderRetrievalServices.get(leaderRetrievalServices.size() - 1);
 	}
 
 	@Override
@@ -103,11 +102,11 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 	public void grantLeadership(int index, UUID leaderSessionID) {
 		if(leaderIndex >= 0) {
 			// first revoke leadership
-			leaderElectionServices[leaderIndex].notLeader();
+			leaderElectionServices.get(leaderIndex).notLeader();
 		}
 
 		// make the JM with index the new leader
-		leaderElectionServices[index].isLeader(leaderSessionID);
+		leaderElectionServices.get(index).isLeader(leaderSessionID);
 
 		leaderIndex = index;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index ea058f4..4e119fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import java.io.Serializable;
 import java.util.UUID;
 
 /**
  * Test {@link LeaderElectionService} implementation which directly forwards isLeader and notLeader
  * calls to the contender.
  */
-public class TestingLeaderElectionService implements LeaderElectionService, Serializable {
-
-	private static final long serialVersionUID = -8007939683948014574L;
+public class TestingLeaderElectionService implements LeaderElectionService {
 
 	private LeaderContender contender;
 	private boolean hasLeadership = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index 43902fd..c44fc2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -29,11 +29,27 @@ import java.util.UUID;
  */
 public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 
+	private final String leaderAddress;
+	private final UUID leaderSessionID;
+
 	private LeaderRetrievalListener listener;
 
+	public TestingLeaderRetrievalService() {
+		this(null, null);
+	}
+
+	public TestingLeaderRetrievalService(String leaderAddress, UUID leaderSessionID) {
+		this.leaderAddress = leaderAddress;
+		this.leaderSessionID = leaderSessionID;
+	}
+
 	@Override
 	public void start(LeaderRetrievalListener listener) throws Exception {
 		this.listener = listener;
+
+		if (leaderAddress != null) {
+			listener.notifyLeaderAddress(leaderAddress, leaderSessionID);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index e91f068..b8f4ede 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -70,6 +70,18 @@ trait TestingJobManagerLike extends FlinkActor {
 
   var disconnectDisabled = false
 
+  var postStopEnabled = true
+
+  abstract override def postStop(): Unit = {
+    if (postStopEnabled) {
+      super.postStop()
+    } else {
+      // only stop leader election service to revoke the leadership of this JM so that a new JM
+      // can be elected leader
+      leaderElectionService.stop()
+    }
+  }
+
   abstract override def handleMessage: Receive = {
     handleTestingMessage orElse super.handleMessage
   }
@@ -270,6 +282,9 @@ trait TestingJobManagerLike extends FlinkActor {
     case DisableDisconnect =>
       disconnectDisabled = true
 
+    case DisablePostStop =>
+      postStopEnabled = false
+
     case msg: Disconnect =>
       if (!disconnectDisabled) {
         super.handleMessage(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 4f5cf14..e4d0a6f 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -87,5 +87,12 @@ object TestingJobManagerMessages {
    */
   case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
 
-  def getNotifyWhenLeader: AnyRef = NotifyWhenLeader
+  /** Disables the post stop method of the [[TestingJobManager]].
+    *
+    * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
+    */
+  case object DisablePostStop
+
+  def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
+  def getDisablePostStop(): AnyRef = DisablePostStop
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index ed2113a..5c7a932 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -22,19 +22,20 @@ import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.PoisonPill;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -42,9 +43,13 @@ import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
 
 import java.io.File;
 import java.io.IOException;
@@ -128,7 +133,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 	@Test
 	public void testJobExecutionOnClusterWithLeaderReelection() throws Exception {
 		int numJMs = 10;
-		int numTMs = 3;
+		int numTMs = 2;
 		int numSlotsPerTM = 3;
 		int parallelism = numTMs * numSlotsPerTM;
 
@@ -141,10 +146,9 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
 		configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getPath());
 
-		// @TODO @tillrohrmann temporary "disable" recovery, because currently the client does
-		// not need to resubmit a failed job to a new leader. Should we keep this test and
-		// disable recovery fully or will this be subsumed by the real client changes anyways?
-		configuration.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, timeout.toString());
+		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
+		// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
+		configuration.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, AkkaUtils.INF_TIMEOUT().toString());
 
 		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
 
@@ -186,51 +190,55 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 			thread.start();
 
+			Deadline deadline = timeout.$times(3).fromNow();
+
 			// Kill all JobManager except for two
-			for (int i = 0; i < numJMs - 2; i++) {
-				ActorGateway jm = cluster.getLeaderGateway(timeout);
+			for (int i = 0; i < numJMs; i++) {
+				ActorGateway jm = cluster.getLeaderGateway(deadline.timeLeft());
 
 				cluster.waitForTaskManagersToBeRegisteredAtJobManager(jm.actor());
 
-				Future<Object> future = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(graph.getJobID()), timeout);
-
-				Await.ready(future, timeout);
+				// recover all jobs, sent manually
+				log.info("Sent recover all jobs manually to job manager {}.", jm.path());
+				jm.tell(JobManagerMessages.getRecoverAllJobs());
 
-				cluster.clearLeader();
-
-				jm.tell(Kill.getInstance());
-			}
+				if (i < numJMs - 1) {
+					Future<Object> future = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(graph.getJobID()), deadline.timeLeft());
 
-			ActorGateway jm = cluster.getLeaderGateway(timeout);
+					Await.ready(future, deadline.timeLeft());
 
-			cluster.waitForTaskManagersToBeRegisteredAtJobManager(jm.actor());
+					cluster.clearLeader();
 
-			Future<Object> future = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(graph.getJobID()), timeout);
+					if (i == numJMs - 2) {
+						Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
+					}
 
-			Await.ready(future, timeout);
+					log.info("Kill job manager {}.", jm.path());
 
-			cluster.clearLeader();
+					jm.tell(TestingJobManagerMessages.getDisablePostStop());
+					jm.tell(Kill.getInstance());
+				}
+			}
 
-			// set the BlockinOnceReceiver for the execution on the last JM to non-blocking, so
-			// that it can succeed
-			Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
+			log.info("Waiting for submitter thread to terminate.");
 
-			jm.tell(PoisonPill.getInstance());
+			thread.join(deadline.timeLeft().toMillis());
 
-			thread.join(timeout.toMillis());
+			log.info("Submitter thread has terminated.");
 
 			if (thread.isAlive()) {
-				jobSubmission.finished = true;
 				fail("The job submission thread did not stop (meaning it did not succeeded in" +
 						"executing the test job.");
 			}
+
+			Await.result(jobSubmission.resultPromise.future(), deadline.timeLeft());
 		}
 		finally {
 			if (clientActorSystem != null) {
 				cluster.shutdownJobClientActorSystem(clientActorSystem);
 			}
 
-			if (thread != null && thread.isAlive() && jobSubmission != null) {
+			if (thread != null && thread.isAlive()) {
 				jobSubmission.finished = true;
 			}
 			cluster.stop();
@@ -238,12 +246,15 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 	}
 
 	public static class JobSubmitterRunnable implements Runnable {
+		private static final Logger LOG = LoggerFactory.getLogger(JobSubmitterRunnable.class);
 		boolean finished = false;
 
 		final ActorSystem clientActorSystem;
 		final ForkableFlinkMiniCluster cluster;
 		final JobGraph graph;
 
+		final Promise<JobExecutionResult> resultPromise = new Promise.DefaultPromise<>();
+
 		public JobSubmitterRunnable(
 				ActorSystem actorSystem,
 				ForkableFlinkMiniCluster cluster,
@@ -255,39 +266,23 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 		@Override
 		public void run() {
-			while (!finished) {
-				try {
-					LeaderRetrievalService lrService =
-							LeaderRetrievalUtils.createLeaderRetrievalService(
-									cluster.configuration());
-
-					ActorGateway jobManagerGateway =
-							LeaderRetrievalUtils.retrieveLeaderGateway(
-									lrService,
-									clientActorSystem,
-									timeout);
-
-					JobClient.submitJobAndWait(
-							clientActorSystem,
-							jobManagerGateway,
-							graph,
-							timeout,
-							false,
-							getClass().getClassLoader());
-
-					finished = true;
-				}
-				catch (JobExecutionException e) {
-					// This was expected, so just try again to submit the job
-				}
-				catch (LeaderRetrievalException e) {
-					// This can also happen, so just try again to submit the job
-				}
-				catch (Exception e) {
-					// This was not expected... fail the test case
-					e.printStackTrace();
-					fail("Caught unexpected exception in job submission test case.");
-				}
+			try {
+				LeaderRetrievalService lrService =
+						LeaderRetrievalUtils.createLeaderRetrievalService(
+								cluster.configuration());
+
+				JobExecutionResult result = JobClient.submitJobAndWait(
+						clientActorSystem,
+						lrService,
+						graph,
+						timeout,
+						false,
+						getClass().getClassLoader());
+
+				resultPromise.success(result);
+			} catch (Exception e) {
+				// This was not expected... fail the test case
+				resultPromise.failure(e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d18f5809/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 9d29841..3ba6d1d 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -18,7 +18,8 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
+log4j.logger.org.apache.flink.runtime.client.JobClientActor=DEBUG
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
@@ -27,5 +28,5 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-log4j.logger.org.apache.zookeeper=OFF, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF
\ No newline at end of file


Mime
View raw message