flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/3] flink git commit: [FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher
Date Wed, 13 Dec 2017 13:26:32 GMT
[FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher

Implement SubmittedJobGraphListener interface in Dispatcher

Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable
this, the dispatcher must implement the SubmittedJobGraphListener interface. Add
simple unit tests for the new methods. Refactor DispatcherTest to remove
redundancy.

[FLINK-8176][flip6] Make InMemorySubmittedJobGraphStore thread-safe

[FLINK-8176][flip6] Add method isStarted() to TestingLeaderElectionService

[FLINK-8176][flip6] Return same RunningJobsRegistry instance from TestingHighAvailabilityServices

[FLINK-8176][flip6] Fix race conditions in Dispatcher and DispatcherTest

Check if jobManagerRunner exists before submitting job.
Replace JobManagerRunner mock used in tests with real instance.
Do not run job graph recovery in actor main thread when job graph is recovered
from SubmittedJobGraphListener#onAddedJobGraph(JobID).

[FLINK-8176][flip6] Rename variables in DispatcherTest

[FLINK-8176][flip6] Remove injectMocks in DispatcherTest

[FLINK-8176][flip6] Update Dispatcher's SubmittedJobGraphListener callbacks

Always attempt the job submission if onAddedJobGraph or onRemovedJobGraph are
called. The checks in submitJob and removeJob are sufficient.

This closes #5107.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ddb674c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ddb674c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ddb674c

Branch: refs/heads/master
Commit: 7ddb674cb17c35f17aa073d3bfd6897d7fc13b9e
Parents: 8941f63
Author: gyao <gary@data-artisans.com>
Authored: Thu Nov 30 15:44:23 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Dec 13 14:21:52 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  46 +++-
 .../runtime/dispatcher/DispatcherTest.java      | 250 ++++++++++++-------
 .../TestingHighAvailabilityServices.java        |   4 +-
 .../TestingLeaderElectionService.java           |   9 +
 .../InMemorySubmittedJobGraphStore.java         |  26 +-
 5 files changed, 235 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 8a26f95..ea3a6ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -63,6 +64,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -73,7 +75,8 @@ import java.util.concurrent.CompletableFuture;
  * the jobs and to recover them in case of a master failure. Furthermore, it knows
  * about the state of the Flink session cluster.
  */
-public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
DispatcherGateway, LeaderContender {
+public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
+	DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
 
 	public static final String DISPATCHER_NAME = "dispatcher";
 
@@ -173,6 +176,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 	public void start() throws Exception {
 		super.start();
 
+		submittedJobGraphStore.start(this);
 		leaderElectionService.start(this);
 	}
 
@@ -197,7 +201,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 				new JobSubmissionException(jobId, "Could not retrieve the job status.", e));
 		}
 
-		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
+		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING &&
+			!jobManagerRunners.containsKey(jobId)) {
 			try {
 				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
 			} catch (Exception e) {
@@ -248,7 +253,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 
 	@Override
 	public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
-		return CompletableFuture.completedFuture(jobManagerRunners.keySet());
+		return CompletableFuture.completedFuture(
+			Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
 	}
 
 	@Override
@@ -399,7 +405,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 	/**
 	 * Recovers all jobs persisted via the submitted job graph store.
 	 */
-	private void recoverJobs() {
+	@VisibleForTesting
+	void recoverJobs() {
 		log.info("Recovering all persisted jobs.");
 
 		getRpcService().execute(
@@ -508,6 +515,37 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 	}
 
 	//------------------------------------------------------
+	// SubmittedJobGraphListener
+	//------------------------------------------------------
+
+	@Override
+	public void onAddedJobGraph(final JobID jobId) {
+		getRpcService().execute(() -> {
+			final SubmittedJobGraph submittedJobGraph;
+			try {
+				submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
+			} catch (final Exception e) {
+				log.error("Could not recover job graph for job {}.", jobId, e);
+				return;
+			}
+			runAsync(() -> {
+				submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT);
+			});
+		});
+	}
+
+	@Override
+	public void onRemovedJobGraph(final JobID jobId) {
+		runAsync(() -> {
+			try {
+				removeJob(jobId, false);
+			} catch (final Exception e) {
+				log.error("Could not remove job {}.", jobId, e);
+			}
+		});
+	}
+
+	//------------------------------------------------------
 	// Utility classes
 	//------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d5b63d4..8627c8e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -20,57 +20,95 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Test for the {@link Dispatcher} component.
  */
 public class DispatcherTest extends TestLogger {
 
+	private static RpcService rpcService;
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+
+	private static final JobID TEST_JOB_ID = new JobID();
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Rule
 	public TestName name = new TestName();
 
-	private static RpcService rpcService;
-	private static final Time timeout = Time.seconds(10L);
+	private JobGraph jobGraph;
+
+	private TestingFatalErrorHandler fatalErrorHandler;
+
+	private SubmittedJobGraphStore submittedJobGraphStore;
+
+	private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+	private TestingLeaderElectionService jobMasterLeaderElectionService;
+
+	private RunningJobsRegistry runningJobsRegistry;
+
+	/** Instance under test. */
+	private TestingDispatcher dispatcher;
 
 	@BeforeClass
 	public static void setup() {
@@ -86,60 +124,77 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that we can submit a job to the Dispatcher which then spawns a
-	 * new JobManagerRunner.
-	 */
-	@Test
-	public void testJobSubmission() throws Exception {
-		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
-
-		TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService();
-		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-		haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
-		haServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
+	@Before
+	public void setUp() throws Exception {
+		final JobVertex testVertex = new JobVertex("testVertex");
+		testVertex.setInvokableClass(NoOpInvokable.class);
+		jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
+		jobGraph.setAllowQueuedScheduling(true);
 
-		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
-		JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class);
+		fatalErrorHandler = new TestingFatalErrorHandler();
+		final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
+		submittedJobGraphStore = spy(new InMemorySubmittedJobGraphStore());
 
-		final JobGraph jobGraph = mock(JobGraph.class);
-		final JobID jobId = new JobID();
-		when(jobGraph.getJobID()).thenReturn(jobId);
+		dispatcherLeaderElectionService = new TestingLeaderElectionService();
+		jobMasterLeaderElectionService = new TestingLeaderElectionService();
 
-		final TestingDispatcher dispatcher = new TestingDispatcher(
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+		haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+		haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService);
+		haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+		haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService());
+		runningJobsRegistry = haServices.getRunningJobsRegistry();
+
+		final Configuration blobServerConfig = new Configuration();
+		blobServerConfig.setString(
+			BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+
+		dispatcher = new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
 			new Configuration(),
 			haServices,
 			mock(ResourceManagerGateway.class),
-			mock(BlobServer.class),
+			new BlobServer(blobServerConfig, new VoidBlobStore()),
 			heartbeatServices,
-			mock(MetricRegistryImpl.class),
+			new NoOpMetricRegistry(),
 			fatalErrorHandler,
-			jobManagerRunner,
-			jobId);
+			TEST_JOB_ID);
 
-		try {
-			dispatcher.start();
+		dispatcher.start();
+	}
 
-			CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+	@After
+	public void tearDown() throws Exception {
+		try {
+			fatalErrorHandler.rethrowError();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+		}
+	}
 
-			// wait for the leader to be elected
-			leaderFuture.get();
+	/**
+	 * Tests that we can submit a job to the Dispatcher which then spawns a
+	 * new JobManagerRunner.
+	 */
+	@Test
+	public void testJobSubmission() throws Exception {
+		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
-			DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+		// wait for the leader to be elected
+		leaderFuture.get();
 
-			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
timeout);
+		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			acknowledgeFuture.get();
+		CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
TIMEOUT);
 
-			verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
+		acknowledgeFuture.get();
 
-			// check that no error has occurred
-			fatalErrorHandler.rethrowError();
-		} finally {
-			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
-		}
+		assertTrue(
+			"jobManagerRunner was not started",
+			dispatcherLeaderElectionService.isStarted());
 	}
 
 	/**
@@ -147,61 +202,63 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testLeaderElection() throws Exception {
-		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
-		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-
 		UUID expectedLeaderSessionId = UUID.randomUUID();
-		CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>();
-		SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class);
-		TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService()
{
-			@Override
-			public void confirmLeaderSessionID(UUID leaderSessionId) {
-				super.confirmLeaderSessionID(leaderSessionId);
-				leaderSessionIdFuture.complete(leaderSessionId);
-			}
-		};
 
-		haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
-		haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
-		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
-		final JobID jobId = new JobID();
+		assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
 
-		final TestingDispatcher dispatcher = new TestingDispatcher(
-			rpcService,
-			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
-			new Configuration(),
-			haServices,
-			mock(ResourceManagerGateway.class),
-			mock(BlobServer.class),
-			heartbeatServices,
-			mock(MetricRegistryImpl.class),
-			fatalErrorHandler,
-			mock(JobManagerRunner.class),
-			jobId);
+		dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
 
-		try {
-			dispatcher.start();
+		UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture()
+			.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+		assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+
+		verify(submittedJobGraphStore, Mockito.timeout(TIMEOUT.toMilliseconds()).atLeast(1)).getJobIds();
+	}
+
+	/**
+	 * Test callbacks from
+	 * {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}.
+	 */
+	@Test
+	public void testSubmittedJobGraphListener() throws Exception {
+		dispatcher.recoverJobsEnabled.set(false);
 
-			assertFalse(leaderSessionIdFuture.isDone());
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
-			testingLeaderElectionService.isLeader(expectedLeaderSessionId);
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+		jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
-			assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+		final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(TEST_JOB_ID);
 
-			verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
-		} finally {
-			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
-		}
+		// pretend that other Dispatcher has removed job from submittedJobGraphStore
+		submittedJobGraphStore.removeJobGraph(TEST_JOB_ID);
+		dispatcher.onRemovedJobGraph(TEST_JOB_ID);
+		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), empty());
+
+		// pretend that other Dispatcher has added a job to submittedJobGraphStore
+		runningJobsRegistry.clearJob(TEST_JOB_ID);
+		submittedJobGraphStore.putJobGraph(submittedJobGraph);
+		dispatcher.onAddedJobGraph(TEST_JOB_ID);
+		dispatcher.submitJobLatch.await();
+		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), hasSize(1));
 	}
 
 	private static class TestingDispatcher extends Dispatcher {
 
-		private final JobManagerRunner jobManagerRunner;
 		private final JobID expectedJobId;
 
-		protected TestingDispatcher(
+		private final CountDownLatch submitJobLatch = new CountDownLatch(2);
+
+		/**
+		 * Controls whether existing jobs in {@link SubmittedJobGraphStore} should be recovered
+		 * when {@link TestingDispatcher} is granted leadership.
+		 * */
+		private final AtomicBoolean recoverJobsEnabled = new AtomicBoolean(true);
+
+		private TestingDispatcher(
 				RpcService rpcService,
 				String endpointId,
 				Configuration configuration,
@@ -211,7 +268,6 @@ public class DispatcherTest extends TestLogger {
 				HeartbeatServices heartbeatServices,
 				MetricRegistry metricRegistry,
 				FatalErrorHandler fatalErrorHandler,
-				JobManagerRunner jobManagerRunner,
 				JobID expectedJobId) throws Exception {
 			super(
 				rpcService,
@@ -225,7 +281,6 @@ public class DispatcherTest extends TestLogger {
 				fatalErrorHandler,
 				null);
 
-			this.jobManagerRunner = jobManagerRunner;
 			this.expectedJobId = expectedJobId;
 		}
 
@@ -243,7 +298,32 @@ public class DispatcherTest extends TestLogger {
 				FatalErrorHandler fatalErrorHandler) throws Exception {
 			assertEquals(expectedJobId, jobGraph.getJobID());
 
-			return jobManagerRunner;
+			return new JobManagerRunner(resourceId, jobGraph, configuration, rpcService,
+				highAvailabilityServices, heartbeatServices, jobManagerServices, metricRegistry,
+				onCompleteActions, fatalErrorHandler, null);
+		}
+
+		@Override
+		public CompletableFuture<Acknowledge> submitJob(final JobGraph jobGraph, final Time
timeout) {
+			final CompletableFuture<Acknowledge> submitJobFuture = super.submitJob(jobGraph,
timeout);
+
+			try {
+				submitJobFuture.get();
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			submitJobLatch.countDown();
+			return submitJobFuture;
+		}
+
+		@Override
+		void recoverJobs() {
+			if (recoverJobsEnabled.get()) {
+				super.recoverJobs();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index dba7bef..db0b88e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -52,6 +52,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile SubmittedJobGraphStore submittedJobGraphStore;
 
+	private final RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
+
 	// ------------------------------------------------------------------------
 	//  Setters for mock / testing implementations
 	// ------------------------------------------------------------------------
@@ -185,7 +187,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	@Override
 	public RunningJobsRegistry getRunningJobsRegistry() {
-		return new StandaloneRunningJobsRegistry();
+		return runningJobsRegistry;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index d951db5..4ecb9b61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -86,4 +86,13 @@ public class TestingLeaderElectionService implements LeaderElectionService
{
 	public synchronized String getAddress() {
 		return contender.getAddress();
 	}
+
+	/**
+	 * Returns <code>true</code> if {@link #start(LeaderContender)} was called,
+	 * <code>false</code> otherwise.
+	 */
+	public synchronized boolean isStarted() {
+		return contender != null;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index bf85771..ee208ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -26,9 +26,13 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * In-Memory implementation of {@link SubmittedJobGraphStore} for testing purposes.
  */
@@ -36,43 +40,45 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore
{
 
 	private final Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
 
-	private volatile boolean started;
+	private boolean started;
 
 	@Override
-	public void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception
{
+	public synchronized void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws
Exception {
 		started = true;
 	}
 
 	@Override
-	public void stop() throws Exception {
+	public synchronized void stop() throws Exception {
 		started = false;
 	}
 
 	@Override
-	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+	public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 		verifyIsStarted();
-		return storedJobs.getOrDefault(jobId, null);
+		return requireNonNull(
+			storedJobs.get(jobId),
+			"Job graph for job " + jobId + " does not exist");
 	}
 
 	@Override
-	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+	public synchronized void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
 		verifyIsStarted();
 		storedJobs.put(jobGraph.getJobId(), jobGraph);
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public synchronized void removeJobGraph(JobID jobId) throws Exception {
 		verifyIsStarted();
 		storedJobs.remove(jobId);
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public synchronized Collection<JobID> getJobIds() throws Exception {
 		verifyIsStarted();
-		return storedJobs.keySet();
+		return Collections.unmodifiableSet(new HashSet<>(storedJobs.keySet()));
 	}
 
-	public boolean contains(JobID jobId) {
+	public synchronized boolean contains(JobID jobId) {
 		verifyIsStarted();
 		return storedJobs.containsKey(jobId);
 	}


Mime
View raw message