flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3261] Allow Task to decline checkpoint request if not ready
Date Mon, 25 Jan 2016 10:37:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1a5ce4da9 -> 893a62f54


[FLINK-3261] Allow Task to decline checkpoint request if not ready

Before, it could happen that a StreamingTask receives a Checkpoint
Trigger message while internally not being ready. The checkpoint
coordinator would then wait the specified timeout interval before
continuing. Now, tasks can signal that they are not ready and the
checkpoint coordinator will dicard a checkpoint for which is this the
case and trigger new checkpoints if necessary.

The newly triggered checkpoints will also release alignment locks in
streaming tasks that are still waiting for barriers from failed
checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/893a62f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/893a62f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/893a62f5

Branch: refs/heads/master
Commit: 893a62f5428b7088a1111ecc5a32ea5bd92df379
Parents: 1a5ce4d
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Jan 21 17:21:09 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jan 25 11:28:23 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  81 ++++++
 .../runtime/jobgraph/tasks/StatefulTask.java    |   4 +-
 .../messages/checkpoint/DeclineCheckpoint.java  |  74 ++++++
 .../apache/flink/runtime/taskmanager/Task.java  |   7 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  44 +++-
 .../checkpoint/CheckpointCoordinatorTest.java   | 247 ++++++++++++++++++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |   1 -
 .../streaming/runtime/tasks/StreamTask.java     |   5 +-
 9 files changed, 453 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 70997c9..9963a20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.slf4j.Logger;
@@ -505,6 +506,86 @@ public class CheckpointCoordinator {
 	}
 
 	/**
+	 * Receives a {@link DeclineCheckpoint} message and returns whether the
+	 * message was associated with a pending checkpoint.
+	 *
+	 * @param message Checkpoint decline from the task manager
+	 *
+	 * @return Flag indicating whether the declined checkpoint was associated
+	 * with a pending checkpoint.
+	 */
+	public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
+		if (shutdown || message == null) {
+			return false;
+		}
+		if (!job.equals(message.getJob())) {
+			LOG.error("Received DeclineCheckpoint message for wrong job: {}", message);
+			return false;
+		}
+
+		final long checkpointId = message.getCheckpointId();
+
+		CompletedCheckpoint completed = null;
+		PendingCheckpoint checkpoint;
+
+		// Flag indicating whether the ack message was for a known pending
+		// checkpoint.
+		boolean isPendingCheckpoint;
+
+		synchronized (lock) {
+			// we need to check inside the lock for being shutdown as well, otherwise we
+			// get races and invalid error log messages
+			if (shutdown) {
+				return false;
+			}
+
+			checkpoint = pendingCheckpoints.get(checkpointId);
+
+			if (checkpoint != null && !checkpoint.isDiscarded()) {
+				isPendingCheckpoint = true;
+
+				LOG.info("Discarding checkpoint " + checkpointId
+					+ " because of checkpoint decline from task " + message.getTaskExecutionId());
+
+				pendingCheckpoints.remove(checkpointId);
+				checkpoint.discard(userClassLoader);
+				rememberRecentCheckpointId(checkpointId);
+
+				boolean haveMoreRecentPending = false;
+				Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
+				while (entries.hasNext()) {
+					PendingCheckpoint p = entries.next().getValue();
+					if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp())
{
+						haveMoreRecentPending = true;
+						break;
+					}
+				}
+				if (!haveMoreRecentPending && !triggerRequestQueued) {
+					LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
+					triggerCheckpoint(System.currentTimeMillis());
+				} else if (!haveMoreRecentPending) {
+					LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
+					triggerQueuedRequests();
+				}
+			} else if (checkpoint != null) {
+				// this should not happen
+				throw new IllegalStateException(
+					"Received message for discarded but non-removed checkpoint " + checkpointId);
+			} else {
+				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
+				if (recentPendingCheckpoints.contains(checkpointId)) {
+					isPendingCheckpoint = true;
+					LOG.info("Received another decline checkpoint message for now expired checkpoint attempt
" + checkpointId);
+				} else {
+					isPendingCheckpoint = false;
+				}
+			}
+		}
+
+		return isPendingCheckpoint;
+	}
+
+	/**
 	 * Receives an AcknowledgeCheckpoint message and returns whether the
 	 * message was associated with a pending checkpoint.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index fac4ec4..aca1bc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -43,8 +43,10 @@ public interface StatefulTask<T extends StateHandle<?>> {
 	 *
 	 * @param checkpointId The ID of the checkpoint, incrementing.
 	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 *
+	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
 	 */
-	void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+	boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
 
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
new file mode 100644
index 0000000..f26d2fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager}
to the
+ * {@link org.apache.flink.runtime.jobmanager.JobManager} to tell the checkpoint coordinator
+ * that a checkpoint request could not be heeded. This can happen if a Task is already in
+ * RUNNING state but is internally not yet ready to perform checkpoints.
+ */
+public class DeclineCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable
{
+
+	private static final long serialVersionUID = 2094094662279578953L;
+
+	/** The timestamp associated with the checkpoint */
+	private final long timestamp;
+
+	public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId,
long timestamp) {
+		super(job, taskExecutionId, checkpointId);
+		this.timestamp = timestamp;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o instanceof DeclineCheckpoint) {
+			DeclineCheckpoint that = (DeclineCheckpoint) o;
+			return this.timestamp == that.timestamp && super.equals(o);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("Declined Checkpoint %d@%d for (%s/%s)",
+				getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 974e687..9cc1be4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.util.SerializedValue;
@@ -874,7 +875,11 @@ public class Task implements Runnable {
 					@Override
 					public void run() {
 						try {
-							statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
+							boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
+							if (!success) {
+								DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID,
checkpointTimestamp);
+								jobManager.tell(decline);
+							}
 						}
 						catch (Throwable t) {
 							failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName,
t));

http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7a4a78f..dc17742 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound,
AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
-import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
+import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, AbstractCheckpointMessage,
AcknowledgeCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
@@ -1153,12 +1153,50 @@ class JobManager(
             }
             else {
               log.error(
-                s"Received ConfirmCheckpoint message for job $jid with no CheckpointCoordinator")
+                s"Received AcknowledgeCheckpoint message for job $jid with no " +
+                  s"CheckpointCoordinator")
             }
 
-          case None => log.error(s"Received ConfirmCheckpoint for unavailable job $jid")
+          case None => log.error(s"Received AcknowledgeCheckpoint for unavailable job
$jid")
         }
 
+      case declineMessage: DeclineCheckpoint =>
+        val jid = declineMessage.getJob()
+        currentJobs.get(jid) match {
+          case Some((graph, _)) =>
+            val checkpointCoordinator = graph.getCheckpointCoordinator()
+            val savepointCoordinator = graph.getSavepointCoordinator()
+
+            if (checkpointCoordinator != null && savepointCoordinator != null) {
+              future {
+                try {
+                  if (checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
+                    // OK, this is the common case
+                  }
+                  else {
+                    // Try the savepoint coordinator if the message was not addressed
+                    // to the periodic checkpoint coordinator.
+                    if (!savepointCoordinator.receiveDeclineMessage(declineMessage)) {
+                      log.info("Received message for non-existing checkpoint " +
+                        declineMessage.getCheckpointId)
+                    }
+                  }
+                }
+                catch {
+                  case t: Throwable =>
+                    log.error(s"Error in CheckpointCoordinator while processing $declineMessage",
t)
+                }
+              }(context.dispatcher)
+            }
+            else {
+              log.error(
+                s"Received DeclineCheckpoint message for job $jid with no CheckpointCoordinator")
+            }
+
+          case None => log.error(s"Received DeclineCheckpoint for unavailable job $jid")
+        }
+
+
       // unknown checkpoint message
       case _ => unhandled(actorMessage)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5ceec6c..d385d73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 
@@ -38,14 +39,11 @@ import org.mockito.stubbing.Answer;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
@@ -194,6 +192,245 @@ public class CheckpointCoordinatorTest {
 		}
 	}
 
+	/**
+	 * This test triggers a checkpoint and then sends a decline checkpoint message from
+	 * one of the tasks. The expected behaviour is that said checkpoint is discarded and a new
+	 * checkpoint is triggered.
+	 */
+	@Test
+	public void testTriggerAndDeclineCheckpointSimple() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid, 600000, 600000,
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				cl,
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
+
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp));
+
+			// validate that we have a pending checkpoint
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+
+			assertNotNull(checkpoint);
+			assertEquals(checkpointId, checkpoint.getCheckpointId());
+			assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint.getCollectedStates().size());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// check that the vertices received the trigger checkpoint message
+			{
+				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId,
timestamp);
+				TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointId,
timestamp);
+				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+			}
+
+			// acknowledge from one of the tasks
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// acknowledge the same task again (should not matter)
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+
+			// decline checkpoint from the other task, this should cancel the checkpoint
+			// and trigger a new one
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+			assertTrue(checkpoint.isDiscarded());
+
+			// validate that we have a new pending checkpoint
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
+
+			assertNotNull(checkpointNew);
+			assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
+			assertEquals(jid, checkpointNew.getJobId());
+			assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpointNew.getCollectedStates().size());
+			assertFalse(checkpointNew.isDiscarded());
+			assertFalse(checkpointNew.isFullyAcknowledged());
+			assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId());
+
+			// check that the vertices received the new trigger checkpoint message
+			{
+				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew,
checkpointNew.getCheckpointTimestamp());
+				TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew,
checkpointNew.getCheckpointTimestamp());
+				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+			}
+
+			// decline again, nothing should happen
+			// decline from the other task, nothing should happen
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId, checkpoint.getCheckpointTimestamp()));
+			assertTrue(checkpoint.isDiscarded());
+
+			// should still have the same second checkpoint pending
+			long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			assertEquals(checkpointIdNew2, checkpointIdNew);
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * This test triggers two checkpoints and then sends a decline message from one of the tasks
+	 * for the first checkpoint. This should discard the first checkpoint while not triggering
+	 * a new checkpoint because a later checkpoint is already in progress.
+	 */
+	@Test
+	public void testTriggerAndDeclineCheckpointComplex() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid, 600000, 600000,
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				cl,
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
+
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp));
+
+			// trigger second checkpoint, should also succeed
+			assertTrue(coord.triggerCheckpoint(timestamp + 2));
+
+			// validate that we have a pending checkpoint
+			assertEquals(2, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
+			long checkpoint1Id = it.next().getKey();
+			long checkpoint2Id = it.next().getKey();
+			PendingCheckpoint checkpoint1 = coord.getPendingCheckpoints().get(checkpoint1Id);
+			PendingCheckpoint checkpoint2 = coord.getPendingCheckpoints().get(checkpoint2Id);
+
+			assertNotNull(checkpoint1);
+			assertEquals(checkpoint1Id, checkpoint1.getCheckpointId());
+			assertEquals(timestamp, checkpoint1.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint1.getJobId());
+			assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint1.getCollectedStates().size());
+			assertFalse(checkpoint1.isDiscarded());
+			assertFalse(checkpoint1.isFullyAcknowledged());
+
+			assertNotNull(checkpoint2);
+			assertEquals(checkpoint2Id, checkpoint2.getCheckpointId());
+			assertEquals(timestamp + 2, checkpoint2.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint2.getJobId());
+			assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint2.getCollectedStates().size());
+			assertFalse(checkpoint2.isDiscarded());
+			assertFalse(checkpoint2.isFullyAcknowledged());
+
+			// check that the vertices received the trigger checkpoint message
+			{
+				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpoint1Id,
timestamp);
+				TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpoint1Id,
timestamp);
+				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+			}
+
+			// check that the vertices received the trigger checkpoint message for the second checkpoint
+			{
+				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpoint2Id,
timestamp + 2);
+				TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpoint2Id,
timestamp + 2);
+				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+			}
+
+			// decline checkpoint from one of the tasks, this should cancel the checkpoint
+			// and trigger a new one
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+			assertTrue(checkpoint1.isDiscarded());
+
+			// validate that we have only one pending checkpoint left
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// validate that it is the same second checkpoint from earlier
+			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
+			assertEquals(checkpoint2Id, checkpointIdNew);
+
+			assertNotNull(checkpointNew);
+			assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
+			assertEquals(jid, checkpointNew.getJobId());
+			assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpointNew.getCollectedStates().size());
+			assertFalse(checkpointNew.isDiscarded());
+			assertFalse(checkpointNew.isFullyAcknowledged());
+			assertNotEquals(checkpoint1.getCheckpointId(), checkpointNew.getCheckpointId());
+
+			// decline again, nothing should happen
+			// decline from the other task, nothing should happen
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+			assertTrue(checkpoint1.isDiscarded());
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	@Test
 	public void testTriggerAndConfirmSimpleCheckpoint() {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 3f9442a..d5c9e85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -204,7 +204,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void triggerCheckpoint(long checkpointId, long timestamp) {
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) {
 			lastCheckpointId++;
 			if (checkpointId == lastCheckpointId) {
 				if (lastCheckpointId == NUM_CALLS) {
@@ -217,6 +217,7 @@ public class TaskAsyncCallTest {
 					notifyAll();
 				}
 			}
+			return true;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1bd01a2..8592182 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1137,7 +1137,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 								final int sourceParallelism,
 								final String topicName,
 								final int valuesCount, final int startFrom) throws Exception {
-		env.getCheckpointConfig().setCheckpointTimeout(5000); // set timeout for checkpoints to
5 seconds
 
 		final int finalCount = valuesCount * sourceParallelism;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/893a62f5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b67a98e..72f74ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -444,7 +444,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	@Override
 	@SuppressWarnings("unchecked,rawtypes")
-	public void triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception
{
+	public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception
{
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
 		
 		synchronized (lock) {
@@ -526,6 +526,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 						throw e;
 					}
 				}
+				return true;
+			} else {
+				return false;
 			}
 		}
 	}


Mime
View raw message