flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/5] flink git commit: [FLINK-4972] Fix CoordinatorShutdownTest
Date Mon, 31 Oct 2016 15:08:29 GMT
Repository: flink
Updated Branches:
  refs/heads/master 094b747a3 -> ba2d007e5


[FLINK-4972] Fix CoordinatorShutdownTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99f1dc3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99f1dc3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99f1dc3e

Branch: refs/heads/master
Commit: 99f1dc3e24ec529852ce38bcb9c46ffaf749333d
Parents: 094b747
Author: zentol <chesnay@apache.org>
Authored: Mon Oct 31 14:15:38 2016 +0100
Committer: zentol <chesnay@apache.org>
Committed: Mon Oct 31 15:12:01 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CoordinatorShutdownTest.java     | 79 +++++++++++++++++---
 1 file changed, 68 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99f1dc3e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 0b2f4f3..777ba9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -26,9 +26,9 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -51,14 +51,15 @@ public class CoordinatorShutdownTest {
 	public void testCoordinatorShutsDownOnFailure() {
 		LocalFlinkMiniCluster cluster = null;
 		try {
-			Configuration noTaskManagerConfig = new Configuration();
-			noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 0);
-			cluster = new LocalFlinkMiniCluster(noTaskManagerConfig, true);
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			cluster = new LocalFlinkMiniCluster(config, true);
 			cluster.start();
 			
 			// build a test graph with snapshotting enabled
 			JobVertex vertex = new JobVertex("Test Vertex");
-			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			vertex.setInvokableClass(FailingBlockingInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 			
 			JobGraph testGraph = new JobGraph("test job", vertex);
@@ -72,11 +73,11 @@ public class CoordinatorShutdownTest {
 					testGraph,
 					ListeningBehaviour.EXECUTION_RESULT);
 			
-			// submit is successful, but then the job dies because no TaskManager / slot is available
+			// submit is successful, but then the job blocks due to the invokable
 			Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout);
 			Await.result(submitFuture, timeout);
 
-			// get the execution graph and make sure the coordinator is properly shut down
+			// get the execution graph and store the ExecutionGraph reference
 			Future<Object> jobRequestFuture = jmGateway.ask(
 					new JobManagerMessages.RequestJob(testGraph.getJobID()),
 					timeout);
@@ -84,8 +85,12 @@ public class CoordinatorShutdownTest {
 			ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture,
timeout)).executionGraph();
 			
 			assertNotNull(graph);
+
+			FailingBlockingInvokable.unblock();
+
 			graph.waitUntilFinished();
 			
+			// verify that the coordinator was shut down
 			CheckpointCoordinator coord = graph.getCheckpointCoordinator();
 			assertTrue(coord == null || coord.isShutdown());
 		}
@@ -105,12 +110,15 @@ public class CoordinatorShutdownTest {
 	public void testCoordinatorShutsDownOnSuccess() {
 		LocalFlinkMiniCluster cluster = null;
 		try {
-			cluster = new LocalFlinkMiniCluster(new Configuration(), true);
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			cluster = new LocalFlinkMiniCluster(config, true);
 			cluster.start();
 			
 			// build a test graph with snapshotting enabled
 			JobVertex vertex = new JobVertex("Test Vertex");
-			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			vertex.setInvokableClass(BlockingInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 
 			JobGraph testGraph = new JobGraph("test job", vertex);
@@ -124,11 +132,11 @@ public class CoordinatorShutdownTest {
 					testGraph,
 					ListeningBehaviour.EXECUTION_RESULT);
 
-			// submit is successful, but then the job dies because no TaskManager / slot is available
+			// submit is successful, but then the job blocks due to the invokable
 			Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout);
 			Await.result(submitFuture, timeout);
 
-			// get the execution graph and make sure the coordinator is properly shut down
+			// get the execution graph and store the ExecutionGraph reference
 			Future<Object> jobRequestFuture = jmGateway.ask(
 					new JobManagerMessages.RequestJob(testGraph.getJobID()),
 					timeout);
@@ -136,8 +144,12 @@ public class CoordinatorShutdownTest {
 			ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture,
timeout)).executionGraph();
 
 			assertNotNull(graph);
+
+			BlockingInvokable.unblock();
+			
 			graph.waitUntilFinished();
 
+			// verify that the coordinator was shut down
 			CheckpointCoordinator coord = graph.getCheckpointCoordinator();
 			assertTrue(coord == null || coord.isShutdown());
 		}
@@ -152,4 +164,49 @@ public class CoordinatorShutdownTest {
 			}
 		}
 	}
+
+	public static class BlockingInvokable extends AbstractInvokable {
+		private static boolean blocking = true;
+		private static final Object lock = new Object();
+
+		@Override
+		public void invoke() throws Exception {
+			while (blocking) {
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+		}
+
+		public static void unblock() {
+			blocking = false;
+
+			synchronized (lock) {
+				lock.notifyAll();
+			}
+		}
+	}
+
+	public static class FailingBlockingInvokable extends AbstractInvokable {
+		private static boolean blocking = true;
+		private static final Object lock = new Object();
+
+		@Override
+		public void invoke() throws Exception {
+			while (blocking) {
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+			throw new RuntimeException("This exception is expected.");
+		}
+
+		public static void unblock() {
+			blocking = false;
+
+			synchronized (lock) {
+				lock.notifyAll();
+			}
+		}
+	}
 }


Mime
View raw message