flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/7] flink git commit: [FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph
Date Fri, 23 Feb 2018 09:24:34 GMT
[FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph

The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended but its
clean up has not been done yet. Only after all Executions have been canceled, the
ExecutionGraph will enter the SUSPENDED state and complete the termination future
accordingly.

This closes #5445.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e96a248
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e96a248
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e96a248

Branch: refs/heads/master
Commit: 7e96a248587ddbf2eb0ae445eb3079a4b2e4753f
Parents: e8d6f39
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Feb 1 18:04:06 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 22 17:32:37 2018 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 46 +++++++++++-----
 .../flink/runtime/jobgraph/JobStatus.java       |  3 +
 .../ZooKeeperSubmittedJobGraphStore.java        |  2 +-
 .../handler/legacy/ExecutionGraphCache.java     | 16 ++++--
 .../ArchivedExecutionGraphTest.java             |  1 +
 .../ExecutionGraphSuspendTest.java              | 58 +++++++++++++++++---
 .../handler/legacy/ExecutionGraphCacheTest.java | 33 ++++++++---
 .../legacy/JobAccumulatorsHandlerTest.java      |  3 +-
 .../handler/legacy/JobConfigHandlerTest.java    |  3 +-
 .../handler/legacy/JobDetailsHandlerTest.java   |  3 +-
 .../rest/handler/legacy/JobPlanHandlerTest.java |  3 +-
 .../JobVertexAccumulatorsHandlerTest.java       |  3 +-
 .../legacy/JobVertexDetailsHandlerTest.java     |  3 +-
 .../utils/ArchivedJobGenerationUtils.java       |  2 +-
 14 files changed, 133 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index beb3ead..9313466 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1085,10 +1085,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	/**
 	 * Suspends the current ExecutionGraph.
 	 *
-	 * <p>The JobStatus will be directly set to SUSPENDED iff the current state is not
a terminal
+	 * <p>The JobStatus will be directly set to SUSPENDING iff the current state is not
a terminal
 	 * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed.
 	 *
-	 * <p>The SUSPENDED state is a local terminal state which stops the execution of the
job but does
+	 * <p>The SUSPENDING state is a local terminal state which stops the execution of
the job but does
 	 * not remove the job from the HA job store so that it can be recovered by another JobManager.
 	 *
 	 * @param suspensionCause Cause of the suspension
@@ -1097,10 +1097,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		while (true) {
 			JobStatus currentState = state;
 
-			if (currentState.isTerminalState()) {
+			if (currentState.isTerminalState() || currentState == JobStatus.SUSPENDING) {
 				// stay in a terminal state
 				return;
-			} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
+			} else if (transitionState(currentState, JobStatus.SUSPENDING, suspensionCause)) {
 				initFailureCause(suspensionCause);
 
 				// make sure no concurrent local actions interfere with the cancellation
@@ -1112,16 +1112,25 @@ public class ExecutionGraph implements AccessExecutionGraph {
 				if (ongoingSchedulingFuture != null) {
 					ongoingSchedulingFuture.cancel(false);
 				}
+				final ArrayList<CompletableFuture<Void>> executionJobVertexTerminationFutures
= new ArrayList<>(verticesInCreationOrder.size());
 
 				for (ExecutionJobVertex ejv: verticesInCreationOrder) {
-					ejv.cancel();
+					executionJobVertexTerminationFutures.add(ejv.cancelWithFuture());
 				}
 
-				synchronized (progressLock) {
-					onTerminalState(JobStatus.SUSPENDED);
+				final ConjunctFuture<Void> jobVerticesTerminationFuture = FutureUtils.waitForAll(executionJobVertexTerminationFutures);
 
-					LOG.info("Job {} has been suspended.", getJobID());
-				}
+				jobVerticesTerminationFuture.whenComplete(
+					(Void ignored, Throwable throwable) -> {
+						if (throwable != null) {
+							LOG.debug("Flink could not properly clean up resource after suspension.", throwable);
+						}
+
+						// the globalModVersion does not play a role because there is no way
+						// currently to leave the SUSPENDING state
+						allVerticesInTerminalState(-1L);
+						LOG.info("Job {} has been suspended.", getJobID());
+					});
 
 				return;
 			}
@@ -1144,6 +1153,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			JobStatus current = state;
 			// stay in these states
 			if (current == JobStatus.FAILING ||
+				current == JobStatus.SUSPENDING ||
 				current == JobStatus.SUSPENDED ||
 				current.isGloballyTerminalState()) {
 				return;
@@ -1216,7 +1226,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 				} else if (current == JobStatus.FAILED) {
 					LOG.info("Failed job during restart. Aborting restart.");
 					return;
-				} else if (current == JobStatus.SUSPENDED) {
+				} else if (current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED) {
 					LOG.info("Suspended job during restart. Aborting restart.");
 					return;
 				} else if (current != JobStatus.RESTARTING) {
@@ -1301,7 +1311,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		return null;
 	}
 
-	@VisibleForTesting
+	/**
+	 * Returns the termination future of this {@link ExecutionGraph}. The termination future
+	 * is completed with the terminal {@link JobStatus} once the ExecutionGraph reaches this
+	 * terminal state and all {@link Execution} have been terminated.
+	 *
+	 * @return Termination future of this {@link ExecutionGraph}.
+	 */
 	public CompletableFuture<JobStatus> getTerminationFuture() {
 		return terminationFuture;
 	}
@@ -1441,9 +1457,11 @@ public class ExecutionGraph implements AccessExecutionGraph {
 				}
 				// concurrent job status change, let's check again
 			}
-			else if (current == JobStatus.SUSPENDED) {
-				// we've already cleaned up when entering the SUSPENDED state
-				break;
+			else if (current == JobStatus.SUSPENDING) {
+				if (transitionState(current, JobStatus.SUSPENDED)) {
+					onTerminalState(JobStatus.SUSPENDED);
+					break;
+				}
 			}
 			else if (current.isGloballyTerminalState()) {
 				LOG.warn("Job has entered globally terminal state without waiting for all " +

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 4ef86bd..c04528e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -47,6 +47,9 @@ public enum JobStatus {
 	/** The job is currently undergoing a reset and total restart */
 	RESTARTING(TerminalState.NON_TERMINAL),
 
+	/** The job has been suspended and is currently waiting for the cleanup to complete */
+	SUSPENDING(TerminalState.NON_TERMINAL),
+
 	/**
 	 * The job has been suspended which means that it has been stopped but not been removed
from a
 	 * potential HA job store.

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index f31c970..a60a40d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -372,7 +372,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 				break;
 
 				case CONNECTION_SUSPENDED: {
-					LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
+					LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job " +
 						"graphs are not monitored (temporarily).");
 				}
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index 19186c4..382e87e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -91,19 +91,22 @@ public class ExecutionGraphCache implements Closeable {
 
 			if (oldEntry != null) {
 				if (currentTime < oldEntry.getTTL()) {
-					if (oldEntry.getExecutionGraphFuture().isDone() && !oldEntry.getExecutionGraphFuture().isCompletedExceptionally())
{
+					final CompletableFuture<AccessExecutionGraph> executionGraphFuture = oldEntry.getExecutionGraphFuture();
+					if (executionGraphFuture.isDone() && !executionGraphFuture.isCompletedExceptionally())
{
 
 						// TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager
but only the ArchivedExecutionGraph
 						try {
-							if (oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED) {
-								return oldEntry.getExecutionGraphFuture();
+							final AccessExecutionGraph executionGraph = executionGraphFuture.get();
+							if (executionGraph.getState() != JobStatus.SUSPENDING &&
+								executionGraph.getState() != JobStatus.SUSPENDED) {
+								return executionGraphFuture;
 							}
 							// send a new request to get the ExecutionGraph from the new leader
 						} catch (InterruptedException | ExecutionException e) {
 							throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly completed
future. This should never happen.", e);
 						}
-					} else if (!oldEntry.getExecutionGraphFuture().isDone()) {
-						return oldEntry.getExecutionGraphFuture();
+					} else if (!executionGraphFuture.isDone()) {
+						return executionGraphFuture;
 					}
 					// otherwise it must be completed exceptionally
 				}
@@ -135,7 +138,8 @@ public class ExecutionGraphCache implements Closeable {
 							newEntry.getExecutionGraphFuture().complete(executionGraph);
 
 							// TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager
but only the ArchivedExecutionGraph
-							if (executionGraph.getState() == JobStatus.SUSPENDED) {
+							if (executionGraph.getState() == JobStatus.SUSPENDING ||
+								executionGraph.getState() == JobStatus.SUSPENDED) {
 								// remove the entry in case of suspension --> triggers new request when accessed
next time
 								cachedExecutionGraphs.remove(jobId, newEntry);
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 8bc5170..f15dca1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -172,6 +172,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDING), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDING));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
 		assertEquals(runtimeGraph.isStoppable(), archivedGraph.isStoppable());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 52d4c81..1b19c53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -49,7 +49,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 public class ExecutionGraphSuspendTest extends TestLogger {
 
 	/**
-	 * Going into SUSPENDED out of CREATED should immediately cancel everything and
+	 * Going into SUSPENDING out of CREATED should immediately cancel everything and
 	 * not send out RPC calls.
 	 */
 	@Test
@@ -72,7 +72,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 	}
 
 	/**
-	 * Going into SUSPENDED out of DEPLOYING vertices should cancel all vertices once with RPC
calls.
+	 * Going into SUSPENDING out of DEPLOYING vertices should cancel all vertices once with
RPC calls.
 	 */
 	@Test
 	public void testSuspendedOutOfDeploying() throws Exception {
@@ -88,15 +88,20 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
 		eg.suspend(new Exception("suspend"));
 
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDING, eg.getState());
+
 		validateAllVerticesInState(eg, ExecutionState.CANCELING);
 		validateCancelRpcCalls(gateway, parallelism);
 
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
 		ensureCannotLeaveSuspendedState(eg, gateway);
 	}
 
 	/**
-	 * Going into SUSPENDED out of RUNNING vertices should cancel all vertices once with RPC
calls.
+	 * Going into SUSPENDING out of RUNNING vertices should cancel all vertices once with RPC
calls.
 	 */
 	@Test
 	public void testSuspendedOutOfRunning() throws Exception {
@@ -114,15 +119,21 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
 		eg.suspend(new Exception("suspend"));
 
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDING, eg.getState());
+
 		validateAllVerticesInState(eg, ExecutionState.CANCELING);
+
 		validateCancelRpcCalls(gateway, parallelism);
 
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
 		ensureCannotLeaveSuspendedState(eg, gateway);
 	}
 
 	/**
-	 * Suspending from FAILING goes to SUSPENDED and sends no additional RPC calls
+	 * Suspending from FAILING goes to SUSPENDING and sends no additional RPC calls
 	 */
 	@Test
 	public void testSuspendedOutOfFailing() throws Exception {
@@ -140,10 +151,14 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
 		// suspend
 		eg.suspend(new Exception("suspend"));
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDING, eg.getState());
+
+		ensureCannotLeaveSuspendingState(eg, gateway);
 
 		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
 
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
 		ensureCannotLeaveSuspendedState(eg, gateway);
 	}
 
@@ -176,7 +191,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 	}
 
 	/**
-	 * Suspending from CANCELING goes to SUSPENDED and sends no additional RPC calls. 
+	 * Suspending from CANCELING goes to SUSPENDING and sends no additional RPC calls.
 	 */
 	@Test
 	public void testSuspendedOutOfCanceling() throws Exception {
@@ -194,10 +209,14 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
 		// suspend
 		eg.suspend(new Exception("suspend"));
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDING, eg.getState());
+
+		ensureCannotLeaveSuspendingState(eg, gateway);
 
 		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
 
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
 		ensureCannotLeaveSuspendedState(eg, gateway);
 	}
 
@@ -280,6 +299,27 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 		}
 	}
 
+	private static void ensureCannotLeaveSuspendingState(ExecutionGraph eg, TaskManagerGateway
gateway) {
+		assertEquals(JobStatus.SUSPENDING, eg.getState());
+		reset(gateway);
+
+		eg.failGlobal(new Exception("fail"));
+		assertEquals(JobStatus.SUSPENDING, eg.getState());
+		verifyNoMoreInteractions(gateway);
+
+		eg.cancel();
+		assertEquals(JobStatus.SUSPENDING, eg.getState());
+		verifyNoMoreInteractions(gateway);
+
+		eg.suspend(new Exception("suspend again"));
+		assertEquals(JobStatus.SUSPENDING, eg.getState());
+		verifyNoMoreInteractions(gateway);
+
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			assertEquals(0, ev.getCurrentExecutionAttempt().getAttemptNumber());
+		}
+	}
+
 	private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected)
{
 		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
 			assertEquals(expected, ev.getCurrentExecutionAttempt().getState());

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
index 8bdaff5..3afd9fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
@@ -248,7 +248,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
 
 	/**
 	 * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph}
is in
-	 * state {@link JobStatus#SUSPENDED}.
+	 * state {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}.
 	 *
 	 * <p>This test can be removed once we no longer request the actual {@link ExecutionGraph}
from the
 	 * {@link JobManager}.
@@ -259,9 +259,11 @@ public class ExecutionGraphCacheTest extends TestLogger {
 		final Time timeToLive = Time.hours(1L);
 		final JobID expectedJobId = new JobID();
 
+		final ArchivedExecutionGraph suspendingExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDING).build();
 		final ArchivedExecutionGraph suspendedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build();
 		final ConcurrentLinkedQueue<CompletableFuture<? extends AccessExecutionGraph>>
requestJobAnswers = new ConcurrentLinkedQueue<>();
 
+		requestJobAnswers.offer(CompletableFuture.completedFuture(suspendingExecutionGraph));
 		requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph));
 		requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph));
 
@@ -278,17 +280,21 @@ public class ExecutionGraphCacheTest extends TestLogger {
 		try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive))
{
 			CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId,
restfulGateway);
 
+			assertEquals(suspendingExecutionGraph, executionGraphFuture.get());
+
+			executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+
 			assertEquals(suspendedExecutionGraph, executionGraphFuture.get());
 
-			CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId,
restfulGateway);
+			executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-			assertEquals(expectedExecutionGraph, executionGraphFuture2.get());
+			assertEquals(expectedExecutionGraph, executionGraphFuture.get());
 		}
 	}
 
 	/**
 	 * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph}
changes its
-	 * state to {@link JobStatus#SUSPENDED}.
+	 * state to {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}.
 	 *
 	 * <p>This test can be removed once we no longer request the actual {@link ExecutionGraph}
from the
 	 * {@link JobManager}.
@@ -299,30 +305,39 @@ public class ExecutionGraphCacheTest extends TestLogger {
 		final Time timeToLive = Time.hours(1L);
 		final JobID expectedJobId = new JobID();
 
+		final SuspendableAccessExecutionGraph toBeSuspendingExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId);
 		final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId);
 
 		final CountingRestfulGateway restfulGateway = createCountingRestfulGateway(
 			expectedJobId,
+			CompletableFuture.completedFuture(toBeSuspendingExecutionGraph),
 			CompletableFuture.completedFuture(toBeSuspendedExecutionGraph),
 			CompletableFuture.completedFuture(expectedExecutionGraph));
 
 		try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive))
{
 			CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId,
restfulGateway);
 
+			assertEquals(toBeSuspendingExecutionGraph, executionGraphFuture.get());
+
+			toBeSuspendingExecutionGraph.setJobStatus(JobStatus.SUSPENDING);
+
+			// retrieve the same job from the cache again --> this should return it and invalidate
the cache entry
+			executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+
 			assertEquals(toBeSuspendedExecutionGraph, executionGraphFuture.get());
 
 			toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED);
 
 			// retrieve the same job from the cache again --> this should return it and invalidate
the cache entry
-			CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId,
restfulGateway);
+			executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-			assertEquals(expectedExecutionGraph, executionGraphFuture2.get());
+			assertEquals(expectedExecutionGraph, executionGraphFuture.get());
 
-			CompletableFuture<AccessExecutionGraph> executionGraphFuture3 = executionGraphCache.getExecutionGraph(expectedJobId,
restfulGateway);
+			executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-			assertEquals(expectedExecutionGraph, executionGraphFuture3.get());
+			assertEquals(expectedExecutionGraph, executionGraphFuture.get());
 
-			assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2));
+			assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(3));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
index f8122ee..00829e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -38,7 +39,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobAccumulatorsHandler.
  */
-public class JobAccumulatorsHandlerTest {
+public class JobAccumulatorsHandlerTest extends TestLogger {
 
 	@Test
 	public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
index d173e0f..2279cd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
@@ -39,7 +40,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobConfigHandler.
  */
-public class JobConfigHandlerTest {
+public class JobConfigHandlerTest extends TestLogger {
 
 	@Test
 	public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
index 2980a08..dbfa8cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -46,7 +47,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobDetailsHandler.
  */
-public class JobDetailsHandlerTest {
+public class JobDetailsHandlerTest extends TestLogger {
 
 	@Test
 	public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
index 29e0819..9edaef1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -34,7 +35,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobPlanHandler.
  */
-public class JobPlanHandlerTest {
+public class JobPlanHandlerTest extends TestLogger {
 
 	@Test
 	public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
index 97356f4..abb22e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -40,7 +41,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobVertexAccumulatorsHandler.
  */
-public class JobVertexAccumulatorsHandlerTest {
+public class JobVertexAccumulatorsHandlerTest extends TestLogger {
 
 	@Test
 	public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
index 9cc294a..0c52171 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationU
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -42,7 +43,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobVertexDetailsHandler.
  */
-public class JobVertexDetailsHandlerTest {
+public class JobVertexDetailsHandlerTest extends TestLogger {
 
 	@Test
 	public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
index ad3a95f..92b0d8a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
@@ -135,7 +135,7 @@ public class ArchivedJobGenerationUtils {
 			.setTasks(tasks)
 			.setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
 			.setState(JobStatus.FINISHED)
-			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
+			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11})
 			.setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
 			.build();
 	}


Mime
View raw message