flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/4] flink git commit: [FLINK-2515] [job manager] Checkpoint coordinator triggers checkpoints only when tasks are running.
Date Fri, 14 Aug 2015 11:26:10 GMT
[FLINK-2515] [job manager] Checkpoint coordinator triggers checkpoints only when tasks are
running.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06e2da35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06e2da35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06e2da35

Branch: refs/heads/master
Commit: 06e2da352fb63f7922f634e6aaf5381d89de57a5
Parents: 4cc8d66
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Aug 13 16:43:53 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Aug 14 12:34:50 2015 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  3 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 54 +++++++++++++++++++-
 .../checkpoint/CheckpointStateRestoreTest.java  | 14 +++--
 3 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06e2da35/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9694132..de83ad9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -23,6 +23,7 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -224,7 +225,7 @@ public class CheckpointCoordinator {
 			ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
 			for (int i = 0; i < tasksToTrigger.length; i++) {
 				Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
-				if (ee != null) {
+				if (ee != null && ee.getState() == ExecutionState.RUNNING) {
 					triggerIDs[i] = ee.getAttemptId();
 				} else {
 					LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting
checkpoint.",

http://git-wip-us.apache.org/repos/asf/flink/blob/06e2da35/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c02d301..cfe77d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -38,7 +39,7 @@ import java.util.List;
  */
 public class CheckpointCoordinatorTest {
 	
-	ClassLoader cl = Thread.currentThread().getContextClassLoader();
+	private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
 	
 	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
@@ -83,6 +84,50 @@ public class CheckpointCoordinatorTest {
 	}
 
 	@Test
+	public void testCheckpointAbortsIfTriggerTasksAreFinished() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
+			ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
+			ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2, ExecutionState.FINISHED);
+
+			// create some mock Execution vertices that need to ack the checkpoint
+			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 1, 600000,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] {}, cl );
+
+			// nothing should be happening
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should not succeed
+			assertFalse(coord.triggerCheckpoint(timestamp));
+
+			// still, nothing should be happening
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
 		try {
 			final JobID jid = new JobID();
@@ -609,10 +654,15 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+		return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
+	}
+	
+	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, ExecutionState
state) {
 		final Execution exec = mock(Execution.class);
 		when(exec.getAttemptId()).thenReturn(attemptID);
+		when(exec.getState()).thenReturn(state);
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);

http://git-wip-us.apache.org/repos/asf/flink/blob/06e2da35/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index c23aaca..902eb4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -45,13 +45,13 @@ import static org.mockito.Mockito.*;
  */
 public class CheckpointStateRestoreTest {
 	
-	ClassLoader cl = Thread.currentThread().getContextClassLoader();
+	private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
 	
 	@Test
 	public void testSetState() {
 		try {
 			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
-					new LocalStateHandle(new SerializableObject()));
+					new LocalStateHandle<SerializableObject>(new SerializableObject()));
 			
 			final JobID jid = new JobID();
 			final JobVertexID statefulId = new JobVertexID();
@@ -120,7 +120,7 @@ public class CheckpointStateRestoreTest {
 	public void testStateOnlyPartiallyAvailable() {
 		try {
 			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
-					new LocalStateHandle(new SerializableObject()));
+					new LocalStateHandle<SerializableObject>(new SerializableObject()));
 
 			final JobID jid = new JobID();
 			final JobVertexID statefulId = new JobVertexID();
@@ -208,11 +208,15 @@ public class CheckpointStateRestoreTest {
 	}
 	
 	// ------------------------------------------------------------------------
-	
+
 	private Execution mockExecution() {
+		return mockExecution(ExecutionState.RUNNING);
+	}
+	
+	private Execution mockExecution(ExecutionState state) {
 		Execution mock = mock(Execution.class);
 		when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID());
-		when(mock.getState()).thenReturn(ExecutionState.CREATED);
+		when(mock.getState()).thenReturn(state);
 		return mock;
 	}
 	


Mime
View raw message