Repository: flink
Updated Branches:
refs/heads/master 094b747a3 -> ba2d007e5
[FLINK-4972] Fix CoordinatorShutdownTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99f1dc3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99f1dc3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99f1dc3e
Branch: refs/heads/master
Commit: 99f1dc3e24ec529852ce38bcb9c46ffaf749333d
Parents: 094b747
Author: zentol <chesnay@apache.org>
Authored: Mon Oct 31 14:15:38 2016 +0100
Committer: zentol <chesnay@apache.org>
Committed: Mon Oct 31 15:12:01 2016 +0100
----------------------------------------------------------------------
.../checkpoint/CoordinatorShutdownTest.java | 79 +++++++++++++++++---
1 file changed, 68 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/99f1dc3e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 0b2f4f3..777ba9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -26,9 +26,9 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -51,14 +51,15 @@ public class CoordinatorShutdownTest {
public void testCoordinatorShutsDownOnFailure() {
LocalFlinkMiniCluster cluster = null;
try {
- Configuration noTaskManagerConfig = new Configuration();
- noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 0);
- cluster = new LocalFlinkMiniCluster(noTaskManagerConfig, true);
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ cluster = new LocalFlinkMiniCluster(config, true);
cluster.start();
// build a test graph with snapshotting enabled
JobVertex vertex = new JobVertex("Test Vertex");
- vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ vertex.setInvokableClass(FailingBlockingInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
JobGraph testGraph = new JobGraph("test job", vertex);
@@ -72,11 +73,11 @@ public class CoordinatorShutdownTest {
testGraph,
ListeningBehaviour.EXECUTION_RESULT);
- // submit is successful, but then the job dies because no TaskManager / slot is available
+ // submit is successful, but then the job blocks due to the invokable
Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout);
Await.result(submitFuture, timeout);
- // get the execution graph and make sure the coordinator is properly shut down
+ // get the execution graph and store the ExecutionGraph reference
Future<Object> jobRequestFuture = jmGateway.ask(
new JobManagerMessages.RequestJob(testGraph.getJobID()),
timeout);
@@ -84,8 +85,12 @@ public class CoordinatorShutdownTest {
ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture,
timeout)).executionGraph();
assertNotNull(graph);
+
+ FailingBlockingInvokable.unblock();
+
graph.waitUntilFinished();
+ // verify that the coordinator was shut down
CheckpointCoordinator coord = graph.getCheckpointCoordinator();
assertTrue(coord == null || coord.isShutdown());
}
@@ -105,12 +110,15 @@ public class CoordinatorShutdownTest {
public void testCoordinatorShutsDownOnSuccess() {
LocalFlinkMiniCluster cluster = null;
try {
- cluster = new LocalFlinkMiniCluster(new Configuration(), true);
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ cluster = new LocalFlinkMiniCluster(config, true);
cluster.start();
// build a test graph with snapshotting enabled
JobVertex vertex = new JobVertex("Test Vertex");
- vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ vertex.setInvokableClass(BlockingInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
JobGraph testGraph = new JobGraph("test job", vertex);
@@ -124,11 +132,11 @@ public class CoordinatorShutdownTest {
testGraph,
ListeningBehaviour.EXECUTION_RESULT);
- // submit is successful, but then the job dies because no TaskManager / slot is available
+ // submit is successful, but then the job blocks due to the invokable
Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout);
Await.result(submitFuture, timeout);
- // get the execution graph and make sure the coordinator is properly shut down
+ // get the execution graph and store the ExecutionGraph reference
Future<Object> jobRequestFuture = jmGateway.ask(
new JobManagerMessages.RequestJob(testGraph.getJobID()),
timeout);
@@ -136,8 +144,12 @@ public class CoordinatorShutdownTest {
ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture,
timeout)).executionGraph();
assertNotNull(graph);
+
+ BlockingInvokable.unblock();
+
graph.waitUntilFinished();
+ // verify that the coordinator was shut down
CheckpointCoordinator coord = graph.getCheckpointCoordinator();
assertTrue(coord == null || coord.isShutdown());
}
@@ -152,4 +164,49 @@ public class CoordinatorShutdownTest {
}
}
}
+
+ public static class BlockingInvokable extends AbstractInvokable {
+ private static boolean blocking = true;
+ private static final Object lock = new Object();
+
+ @Override
+ public void invoke() throws Exception {
+ while (blocking) {
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+ }
+
+ public static void unblock() {
+ blocking = false;
+
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+
+ public static class FailingBlockingInvokable extends AbstractInvokable {
+ private static boolean blocking = true;
+ private static final Object lock = new Object();
+
+ @Override
+ public void invoke() throws Exception {
+ while (blocking) {
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+ throw new RuntimeException("This exception is expected.");
+ }
+
+ public static void unblock() {
+ blocking = false;
+
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
}
|