Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E81318E75 for ; Tue, 12 May 2015 21:03:13 +0000 (UTC) Received: (qmail 86743 invoked by uid 500); 12 May 2015 21:03:13 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 86648 invoked by uid 500); 12 May 2015 21:03:13 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 86611 invoked by uid 99); 12 May 2015 21:03:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 May 2015 21:03:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CD714E0B3F; Tue, 12 May 2015 21:03:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Tue, 12 May 2015 21:03:13 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/10] flink git commit: [FLINK-1953] [runtime] Integrate new snapshot checkpoint coordinator with jobgraph and execution graph [FLINK-1953] [runtime] Integrate new snapshot checkpoint coordinator with jobgraph and execution graph This closes #651 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b7f8aa1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b7f8aa1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b7f8aa1 Branch: refs/heads/master Commit: 9b7f8aa121e4a231632296d0809029aca9ebde6a Parents: ff750e6 Author: Stephan Ewen Authored: Thu Apr 30 19:59:36 2015 +0200 Committer: Stephan Ewen Committed: Tue May 12 21:35:57 2015 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 169 ++++++- .../CheckpointCoordinatorDeActivator.java | 56 +++ .../runtime/checkpoint/PendingCheckpoint.java | 11 +- .../flink/runtime/checkpoint/StateForTask.java | 12 +- .../deployment/TaskDeploymentDescriptor.java | 32 +- .../flink/runtime/execution/Environment.java | 24 +- .../flink/runtime/executiongraph/Execution.java | 20 +- .../runtime/executiongraph/ExecutionGraph.java | 484 +++++++++++-------- .../executiongraph/ExecutionJobVertex.java | 3 +- .../runtime/executiongraph/ExecutionVertex.java | 25 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 44 +- .../jobgraph/tasks/AbstractInvokable.java | 6 +- .../tasks/CheckpointCommittingOperator.java | 24 + .../jobgraph/tasks/CheckpointedOperator.java | 24 + .../jobgraph/tasks/JobSnapshottingSettings.java | 97 ++++ .../jobgraph/tasks/OperatorStateCarrier.java | 16 +- .../messages/checkpoint/AbortCheckpoint.java | 49 -- .../checkpoint/AcknowledgeCheckpoint.java | 8 +- .../messages/checkpoint/ConfirmCheckpoint.java | 28 +- .../flink/runtime/state/LocalStateHandle.java | 27 +- .../apache/flink/runtime/state/StateHandle.java | 11 +- .../apache/flink/runtime/state/StateUtils.java | 54 +++ .../runtime/taskmanager/RuntimeEnvironment.java | 25 +- .../apache/flink/runtime/taskmanager/Task.java | 150 +++++- .../flink/runtime/util/SerializableObject.java | 28 ++ .../flink/runtime/jobmanager/JobManager.scala | 108 +++-- .../StreamCheckpointCoordinator.scala | 151 ------ .../messages/CheckpointingMessages.scala | 52 -- .../flink/runtime/taskmanager/TaskManager.scala | 44 +- .../checkpoint/CheckpointCoordinatorTest.java | 14 +- .../checkpoint/CheckpointStateRestoreTest.java | 235 +++++++++ .../checkpoint/CoordinatorShutdownTest.java | 144 ++++++ .../messages/CheckpointMessagesTest.java | 17 +- .../operators/testutils/MockEnvironment.java | 11 +- .../api/graph/StreamingJobGraphGenerator.java | 70 ++- .../streaming/runtime/tasks/StreamTask.java | 41 +- .../StreamCheckpointingITCase.java | 4 +- .../TaskManagerFailureRecoveryITCase.java | 2 +- 38 files changed, 1630 insertions(+), 690 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9647ca4..b3f6587 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -18,10 +18,17 @@ package org.apache.flink.runtime.checkpoint; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; + import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; @@ -41,7 +48,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** - * + * The checkpoint coordinator coordinates the distributed snapshots of operators and state. + * It triggers the checkpoint by sending the messages to the relevant tasks and collects the + * checkpoint acknowledgements. It also collects and maintains the overview of the state handles + * reported by the tasks that acknowledge the checkpoint. */ public class CheckpointCoordinator { @@ -76,13 +86,17 @@ public class CheckpointCoordinator { private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(); - /** The timer that processes the checkpoint timeouts */ - private final Timer timeoutTimer; + /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */ + private final Timer timer; private final long checkpointTimeout; private final int numSuccessfulCheckpointsToRetain; + private TimerTask periodicScheduler; + + private ActorRef jobStatusListener; + private boolean shutdown; // -------------------------------------------------------------------------------------------- @@ -114,9 +128,13 @@ public class CheckpointCoordinator { this.completedCheckpoints = new ArrayDeque(numSuccessfulCheckpointsToRetain + 1); this.recentPendingCheckpoints = new ArrayDeque(NUM_GHOST_CHECKPOINT_IDS); - timeoutTimer = new Timer("Checkpoint Timeout Handler", true); + timer = new Timer("Checkpoint Timer", true); } + // -------------------------------------------------------------------------------------------- + // Clean shutdown + // -------------------------------------------------------------------------------------------- + /** * Shuts down the checkpoint coordinator. * @@ -129,9 +147,22 @@ public class CheckpointCoordinator { return; } shutdown = true; + LOG.info("Stopping checkpoint coordinator jor job " + job); // shut down the thread that handles the timeouts - timeoutTimer.cancel(); + timer.cancel(); + + // make sure that the actor does not linger + if (jobStatusListener != null) { + jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender()); + jobStatusListener = null; + } + + // the scheduling thread needs also to go away + if (periodicScheduler != null) { + periodicScheduler.cancel(); + periodicScheduler = null; + } // clear and discard all pending checkpoints for (PendingCheckpoint pending : pendingCheckpoints.values()) { @@ -146,6 +177,10 @@ public class CheckpointCoordinator { completedCheckpoints.clear(); } } + + public boolean isShutdown() { + return shutdown; + } // -------------------------------------------------------------------------------------------- // Handling checkpoints and messages @@ -235,7 +270,7 @@ public class CheckpointCoordinator { throw new IllegalStateException("Checkpoint coordinator has been shutdown."); } pendingCheckpoints.put(checkpointID, checkpoint); - timeoutTimer.schedule(canceller, checkpointTimeout); + timer.schedule(canceller, checkpointTimeout); } // send the messages to the tasks that trigger their checkpoint @@ -270,7 +305,8 @@ public class CheckpointCoordinator { } final long checkpointId = message.getCheckpointId(); - boolean checkpointCompleted = false; + + SuccessfulCheckpoint completed = null; synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we @@ -286,7 +322,7 @@ public class CheckpointCoordinator { if (checkpoint.isFullyAcknowledged()) { LOG.info("Completed checkpoint " + checkpointId); - SuccessfulCheckpoint completed = checkpoint.toCompletedCheckpoint(); + completed = checkpoint.toCompletedCheckpoint(); completedCheckpoints.addLast(completed); if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) { completedCheckpoints.removeFirst(); @@ -295,8 +331,6 @@ public class CheckpointCoordinator { rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(completed.getTimestamp()); - - checkpointCompleted = true; } } else { @@ -323,12 +357,13 @@ public class CheckpointCoordinator { // send the confirmation messages to the necessary targets. we do this here // to be outside the lock scope - if (checkpointCompleted) { + if (completed != null) { + final long timestamp = completed.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ExecutionAttemptID attemptId = ee.getAttemptId(); - ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId); + ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId, timestamp); ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId()); } } @@ -355,6 +390,64 @@ public class CheckpointCoordinator { } // -------------------------------------------------------------------------------------------- + // Checkpoint State Restoring + // -------------------------------------------------------------------------------------------- + + public void restoreLatestCheckpointedState(Map tasks, + boolean errorIfNoCheckpoint, + boolean allOrNothingState) throws Exception { + synchronized (lock) { + if (shutdown) { + throw new IllegalStateException("CheckpointCoordinator is hut down"); + } + + if (completedCheckpoints.isEmpty()) { + if (errorIfNoCheckpoint) { + throw new IllegalStateException("No completed checkpoint available"); + } else { + return; + } + } + + // restore from the latest checkpoint + SuccessfulCheckpoint latest = completedCheckpoints.getLast(); + + if (allOrNothingState) { + Map stateCounts = new HashMap(); + + for (StateForTask state : latest.getStates()) { + ExecutionJobVertex vertex = tasks.get(state.getOperatorId()); + Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt(); + exec.setInitialState(state.getState()); + + Integer count = stateCounts.get(vertex); + if (count != null) { + stateCounts.put(vertex, count+1); + } else { + stateCounts.put(vertex, 1); + } + } + + // validate that either all task vertices have state, or none + for (Map.Entry entry : stateCounts.entrySet()) { + ExecutionJobVertex vertex = entry.getKey(); + if (entry.getValue() != vertex.getParallelism()) { + throw new IllegalStateException( + "The checkpoint contained state only for a subset of tasks for vertex " + vertex); + } + } + } + else { + for (StateForTask state : latest.getStates()) { + ExecutionJobVertex vertex = tasks.get(state.getOperatorId()); + Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt(); + exec.setInitialState(state.getState()); + } + } + } + } + + // -------------------------------------------------------------------------------------------- // Accessors // -------------------------------------------------------------------------------------------- @@ -377,4 +470,56 @@ public class CheckpointCoordinator { return new ArrayList(this.completedCheckpoints); } } + + // -------------------------------------------------------------------------------------------- + // Periodic scheduling of checkpoints + // -------------------------------------------------------------------------------------------- + + public void startPeriodicCheckpointScheduler(long interval) { + synchronized (lock) { + if (shutdown) { + throw new IllegalArgumentException("Checkpoint coordinator is shut down"); + } + + // cancel any previous scheduler + stopPeriodicCheckpointScheduler(); + + // start a new scheduler + periodicScheduler = new TimerTask() { + @Override + public void run() { + try { + triggerCheckpoint(); + } + catch (Exception e) { + LOG.error("Exception while triggering checkpoint", e); + } + } + }; + timer.scheduleAtFixedRate(periodicScheduler, interval, interval); + } + } + + public void stopPeriodicCheckpointScheduler() { + synchronized (lock) { + if (periodicScheduler != null) { + periodicScheduler.cancel(); + periodicScheduler = null; + } + } + } + + public ActorRef createJobStatusListener(ActorSystem actorSystem, long checkpointInterval) { + synchronized (lock) { + if (shutdown) { + throw new IllegalArgumentException("Checkpoint coordinator is shut down"); + } + + if (jobStatusListener == null) { + Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, checkpointInterval); + jobStatusListener = actorSystem.actorOf(props); + } + return jobStatusListener; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java new file mode 100644 index 0000000..a6c4d76 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import akka.actor.UntypedActor; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.ExecutionGraphMessages; + +/** + * This actor listens to changes in the JobStatus and activates or deactivates the periodic + * checkpoint scheduler. + */ +public class CheckpointCoordinatorDeActivator extends UntypedActor { + + private final CheckpointCoordinator coordinator; + private final long interval; + + public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator, long interval) { + this.coordinator = coordinator; + this.interval = interval; + } + + @Override + public void onReceive(Object message) { + if (message instanceof ExecutionGraphMessages.JobStatusChanged) { + JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus(); + + if (status == JobStatus.RUNNING) { + // start the checkpoint scheduler + coordinator.startPeriodicCheckpointScheduler(interval); + } + else { + // anything else should stop the trigger for now + coordinator.stopPeriodicCheckpointScheduler(); + } + } + + // we ignore all other messages + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index e221238..f25bff9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.SerializedValue; import java.util.ArrayList; import java.util.List; @@ -31,6 +32,9 @@ import java.util.Map; * A pending checkpoint is a checkpoint that has been started, but has not been * acknowledged by all tasks that need to acknowledge it. Once all tasks have * acknowledged it, it becomes a {@link SuccessfulCheckpoint}. + * + *

Note that the pending checkpoint, as well as the successful checkpoint keep the + * state handles always as serialized values, never as actual values.

*/ public class PendingCheckpoint { @@ -117,12 +121,12 @@ public class PendingCheckpoint { return completed; } else { - throw new IllegalStateException("Cannot complete checkpoint while nit all tasks are acknowledged"); + throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged"); } } } - public boolean acknowledgeTask(ExecutionAttemptID attemptID, StateHandle state) { + public boolean acknowledgeTask(ExecutionAttemptID attemptID, SerializedValue> state) { synchronized (lock) { if (discarded) { return false; @@ -158,6 +162,7 @@ public class PendingCheckpoint { @Override public String toString() { - return ""; + return String.format("PendingCheckpoint %d @ %d - confirmed=%d, pending=%d", + checkpointId, checkpointTimestamp, getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java index 83a6dc8..26b3eb7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java @@ -20,16 +20,22 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.SerializedValue; /** * Simple bean to describe the state belonging to a parallel operator. * Since we hold the state across execution attempts, we identify a task by its * JobVertexId and subtask index. + * + * The state itself is kept in serialized from, since the checkpoint coordinator itself + * is never looking at it anyways and only sends it back out in case of a recovery. + * Furthermore, the state may involve user-defined classes that are not accessible without + * the respective classloader. */ public class StateForTask { /** The state of the parallel operator */ - private final StateHandle state; + private final SerializedValue> state; /** The vertex id of the parallel operator */ private final JobVertexID operatorId; @@ -37,7 +43,7 @@ public class StateForTask { /** The index of the parallel subtask */ private final int subtask; - public StateForTask(StateHandle state, JobVertexID operatorId, int subtask) { + public StateForTask(SerializedValue> state, JobVertexID operatorId, int subtask) { if (state == null || operatorId == null || subtask < 0) { throw new IllegalArgumentException(); } @@ -49,7 +55,7 @@ public class StateForTask { // -------------------------------------------------------------------------------------------- - public StateHandle getState() { + public SerializedValue> getState() { return state; } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 5d96903..0a1268d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.SerializedValue; import java.io.Serializable; import java.util.Collection; @@ -77,9 +78,8 @@ public final class TaskDeploymentDescriptor implements Serializable { /** The list of JAR files required to run this task. */ private final List requiredJarFiles; - private StateHandle operatorStates; - - + private final SerializedValue> operatorState; + /** * Constructs a task deployment descriptor. */ @@ -89,15 +89,18 @@ public final class TaskDeploymentDescriptor implements Serializable { Configuration taskConfiguration, String invokableClassName, List producedPartitions, List inputGates, - List requiredJarFiles, int targetSlotNumber) { + List requiredJarFiles, int targetSlotNumber, + SerializedValue> operatorState) { + checkArgument(indexInSubtaskGroup >= 0); + checkArgument(numberOfSubtasks > indexInSubtaskGroup); + checkArgument(targetSlotNumber >= 0); + this.jobID = checkNotNull(jobID); this.vertexID = checkNotNull(vertexID); this.executionId = checkNotNull(executionId); this.taskName = checkNotNull(taskName); - checkArgument(indexInSubtaskGroup >= 0); this.indexInSubtaskGroup = indexInSubtaskGroup; - checkArgument(numberOfSubtasks > indexInSubtaskGroup); this.numberOfSubtasks = numberOfSubtasks; this.jobConfiguration = checkNotNull(jobConfiguration); this.taskConfiguration = checkNotNull(taskConfiguration); @@ -105,8 +108,8 @@ public final class TaskDeploymentDescriptor implements Serializable { this.producedPartitions = checkNotNull(producedPartitions); this.inputGates = checkNotNull(inputGates); this.requiredJarFiles = checkNotNull(requiredJarFiles); - checkArgument(targetSlotNumber >= 0); this.targetSlotNumber = targetSlotNumber; + this.operatorState = operatorState; } public TaskDeploymentDescriptor( @@ -115,14 +118,11 @@ public final class TaskDeploymentDescriptor implements Serializable { Configuration taskConfiguration, String invokableClassName, List producedPartitions, List inputGates, - List requiredJarFiles, int targetSlotNumber, - StateHandle operatorStates) { + List requiredJarFiles, int targetSlotNumber) { this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, - inputGates, requiredJarFiles, targetSlotNumber); - - setOperatorState(operatorStates); + inputGates, requiredJarFiles, targetSlotNumber, null); } /** @@ -232,11 +232,7 @@ public final class TaskDeploymentDescriptor implements Serializable { return strBuilder.toString(); } - public void setOperatorState(StateHandle operatorStates) { - this.operatorStates = operatorStates; - } - - public StateHandle getOperatorStates() { - return operatorStates; + public SerializedValue> getOperatorState() { + return operatorState; } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 081e3ca..755f1ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.execution; -import akka.actor.ActorRef; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -31,6 +30,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.state.StateHandle; import java.util.Map; import java.util.concurrent.Future; @@ -148,6 +148,25 @@ public interface Environment { */ void reportAccumulators(Map> accumulators); + /** + * Confirms that the invokable has successfully completed all steps it needed to + * to for the checkpoint with the give checkpoint-ID. This method does not include + * any state in the checkpoint. + * + * @param checkpointId The ID of the checkpoint. + */ + void acknowledgeCheckpoint(long checkpointId); + + /** + * Confirms that the invokable has successfully completed all steps it needed to + * to for the checkpoint with the give checkpoint-ID. This method does include + * the given state in the checkpoint. + * + * @param checkpointId The ID of the checkpoint. + * @param state A handle to the state to be included in the checkpoint. + */ + void acknowledgeCheckpoint(long checkpointId, StateHandle state); + // -------------------------------------------------------------------------------------------- // Fields relevant to the I/O system. Should go into Task // -------------------------------------------------------------------------------------------- @@ -159,7 +178,4 @@ public interface Environment { InputGate getInputGate(int index); InputGate[] getAllInputGates(); - - // this should go away - ActorRef getJobManager(); } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 4e046dd..731d70f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.SerializedValue; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -129,7 +130,7 @@ public class Execution implements Serializable { private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution - private StateHandle operatorState; + private SerializedValue> operatorState; // -------------------------------------------------------------------------------------------- @@ -204,6 +205,13 @@ public class Execution implements Serializable { partialInputChannelDeploymentDescriptors = null; } + public void setInitialState(SerializedValue> initialState) { + if (state != ExecutionState.CREATED) { + throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED"); + } + this.operatorState = initialState; + } + // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- @@ -325,7 +333,7 @@ public class Execution implements Serializable { attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname())); } - final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot); + final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState); // register this execution at the execution graph, to receive call backs vertex.getExecutionGraph().registerExecution(this); @@ -903,12 +911,4 @@ public class Execution implements Serializable { return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(), (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); } - - public void setOperatorState(StateHandle operatorStates) { - this.operatorState = operatorStates; - } - - public StateHandle getOperatorState() { - return operatorState; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index d38913e..90cf42e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -18,12 +18,14 @@ package org.apache.flink.runtime.executiongraph; -import akka.actor.ActorContext; import akka.actor.ActorRef; + +import akka.actor.ActorSystem; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; @@ -32,20 +34,20 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; -import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; -import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.ExceptionUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple3; -import scala.concurrent.duration.Duration; + import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -54,7 +56,6 @@ import java.util.NoSuchElementException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static akka.dispatch.Futures.future; @@ -80,8 +81,11 @@ import static akka.dispatch.Futures.future; * about deployment of tasks and updates in the task status always use the ExecutionAttemptID to * address the message receiver. * - * - * + * + *

The ExecutionGraph implements {@link java.io.Serializable}, because it can be archived by + * sending it to an archive actor via an actor message. The execution graph does contain some + * non-serializable fields. These fields are not required in the archived form and are cleared + * in the {@link #prepareForArchiving()} method.

*/ public class ExecutionGraph implements Serializable { @@ -92,9 +96,15 @@ public class ExecutionGraph implements Serializable { /** The log object used for debugging. */ static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class); + + private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1; // -------------------------------------------------------------------------------------------- + /** The lock used to secure all access to mutable fields, especially the tracking of progress + * within the job. */ + private final SerializableObject progressLock = new SerializableObject(); + /** The ID of the job this graph has been built for. */ private final JobID jobID; @@ -104,9 +114,6 @@ public class ExecutionGraph implements Serializable { /** The job configuration that was originally attached to the JobGraph. */ private final Configuration jobConfiguration; - /** The classloader for the user code. Needed for calls into user code classes */ - private ClassLoader userClassLoader; - /** All job vertices that are part of this graph */ private final ConcurrentHashMap tasks; @@ -123,8 +130,11 @@ public class ExecutionGraph implements Serializable { * inside the BlobService and are referenced via the BLOB keys. */ private final List requiredJarFiles; + /** Listeners that receive messages when the entire job switches it status (such as from + * RUNNING to FINISHED) */ private final List jobStatusListenerActors; + /** Listeners that receive messages whenever a single task execution changes its status */ private final List executionListenerActors; /** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when @@ -133,10 +143,6 @@ public class ExecutionGraph implements Serializable { * at {@code stateTimestamps[RUNNING.ordinal()]}. */ private final long[] stateTimestamps; - /** The lock used to secure all access to mutable fields, especially the tracking of progress - * within the job. */ - private final Object progressLock = new Object(); - /** The timeout for all messages that require a response/acknowledgement */ private final FiniteDuration timeout; @@ -158,8 +164,11 @@ public class ExecutionGraph implements Serializable { * from results than need to be materialized. */ private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; + /** Flag that indicate whether the executed dataflow should be periodically snapshotted */ + private boolean snapshotCheckpointsEnabled; + - // ------ Execution status and progress ------- + // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- /** Current status of the job execution */ private volatile JobStatus state = JobStatus.CREATED; @@ -168,31 +177,36 @@ public class ExecutionGraph implements Serializable { * that was not recoverable and triggered job failure */ private volatile Throwable failureCause; - /** The scheduler to use for scheduling new tasks as they are needed */ - private Scheduler scheduler; - /** The position of the vertex that is next expected to finish. * This is an index into the "verticesInCreationOrder" collection. * Once this value has reached the number of vertices, the job is done. */ - private int nextVertexToFinish; - - - - private ActorContext parentContext; - - private ActorRef stateCheckpointerActor; - - private boolean checkpointingEnabled; + private volatile int nextVertexToFinish; + + + // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- - private long checkpointingInterval = 5000; + /** The scheduler to use for scheduling new tasks as they are needed */ + @SuppressWarnings("NonSerializableFieldInSerializableClass") + private Scheduler scheduler; - public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) { - this(jobId, jobName, jobConfig, timeout, new ArrayList()); - } + /** The classloader for the user code. Needed for calls into user code classes */ + @SuppressWarnings("NonSerializableFieldInSerializableClass") + private ClassLoader userClassLoader; + + /** The coordinator for checkpoints, if snapshot checkpoints are enabled */ + @SuppressWarnings("NonSerializableFieldInSerializableClass") + private CheckpointCoordinator checkpointCoordinator; + + + // -------------------------------------------------------------------------------------------- + // Constructors + // -------------------------------------------------------------------------------------------- - public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, - FiniteDuration timeout, List requiredJarFiles) { - this(jobId, jobName, jobConfig, timeout, requiredJarFiles, Thread.currentThread().getContextClassLoader()); + /** + * This constructor is for tests only, because it does not include class loading information. + */ + ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) { + this(jobId, jobName, jobConfig, timeout, new ArrayList(), ExecutionGraph.class.getClassLoader()); } public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout, @@ -224,18 +238,8 @@ public class ExecutionGraph implements Serializable { } // -------------------------------------------------------------------------------------------- - - public void setStateCheckpointerActor(ActorRef stateCheckpointerActor) { - this.stateCheckpointerActor = stateCheckpointerActor; - } - - public ActorRef getStateCheckpointerActor() { - return stateCheckpointerActor; - } - - public void setParentContext(ActorContext parentContext) { - this.parentContext = parentContext; - } + // Configuration of Data-flow wide execution settings + // -------------------------------------------------------------------------------------------- public void setNumberOfRetriesLeft(int numberOfRetriesLeft) { if (numberOfRetriesLeft < -1) { @@ -259,46 +263,97 @@ public class ExecutionGraph implements Serializable { return delayBeforeRetrying; } - public void attachJobGraph(List topologiallySorted) throws JobException { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d " - + "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size())); + public boolean isQueuedSchedulingAllowed() { + return this.allowQueuedScheduling; + } + + public void setQueuedSchedulingAllowed(boolean allowed) { + this.allowQueuedScheduling = allowed; + } + + public void setScheduleMode(ScheduleMode scheduleMode) { + this.scheduleMode = scheduleMode; + } + + public ScheduleMode getScheduleMode() { + return scheduleMode; + } + + public void enableSnaphotCheckpointing(long interval, long checkpointTimeout, + List verticesToTrigger, + List verticesToWaitFor, + List verticesToCommitTo, + ActorSystem actorSystem) + { + // simple sanity checks + if (interval < 10 || checkpointTimeout < 10) { + throw new IllegalArgumentException(); + } + if (state != JobStatus.CREATED) { + throw new IllegalStateException("Job must be in CREATED state"); } + + ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); + ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); + ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); - final long createTimestamp = System.currentTimeMillis(); + // disable to make sure existing checkpoint coordinators are cleared + disableSnaphotCheckpointing(); - for (AbstractJobVertex jobVertex : topologiallySorted) { - - // create the execution job vertex and attach it to the graph - ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp); - ejv.connectToPredecessors(this.intermediateResults); - - ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); - if (previousTask != null) { - throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", - jobVertex.getID(), ejv, previousTask)); - } - - for (IntermediateResult res : ejv.getProducedDataSets()) { - IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res); - if (previousDataSet != null) { - throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", - res.getId(), res, previousDataSet)); - } - } - - this.verticesInCreationOrder.add(ejv); + // create the coordinator that triggers and commits checkpoints and holds the state + snapshotCheckpointsEnabled = true; + checkpointCoordinator = new CheckpointCoordinator(jobID, NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, + checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo); + + // the periodic checkpoint scheduler is activated and deactivated as a result of + // job status changes (running -> on, all other states -> off) + registerJobStatusListener(checkpointCoordinator.createJobStatusListener(actorSystem, interval)); + } + + public void disableSnaphotCheckpointing() { + if (state != JobStatus.CREATED) { + throw new IllegalStateException("Job must be in CREATED state"); + } + + snapshotCheckpointsEnabled = false; + if (checkpointCoordinator != null) { + checkpointCoordinator.shutdown(); + checkpointCoordinator = null; } } + + public boolean isSnapshotCheckpointsEnabled() { + return snapshotCheckpointsEnabled; + } - public void setCheckpointingEnabled(boolean checkpointingEnabled) { - this.checkpointingEnabled = checkpointingEnabled; + public CheckpointCoordinator getCheckpointCoordinator() { + return checkpointCoordinator; } - public void setCheckpointingInterval(long checkpointingInterval) { - this.checkpointingInterval = checkpointingInterval; + private ExecutionVertex[] collectExecutionVertices(List jobVertices) { + if (jobVertices.size() == 1) { + ExecutionJobVertex jv = jobVertices.get(0); + if (jv.getGraph() != this) { + throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph"); + } + return jv.getTaskVertices(); + } + else { + ArrayList all = new ArrayList(); + for (ExecutionJobVertex jv : jobVertices) { + if (jv.getGraph() != this) { + throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph"); + } + all.addAll(Arrays.asList(jv.getTaskVertices())); + } + return all.toArray(new ExecutionVertex[all.size()]); + } } + // -------------------------------------------------------------------------------------------- + // Properties and Status of the Execution Graph + // -------------------------------------------------------------------------------------------- + /** * Returns a list of BLOB keys referring to the JAR files required to run this job * @return list of BLOB keys referring to the JAR files required to run this job @@ -307,8 +362,6 @@ public class ExecutionGraph implements Serializable { return this.requiredJarFiles; } - // -------------------------------------------------------------------------------------------- - public Scheduler getScheduler() { return scheduler; } @@ -396,26 +449,42 @@ public class ExecutionGraph implements Serializable { return this.stateTimestamps[status.ordinal()]; } - public boolean isQueuedSchedulingAllowed() { - return this.allowQueuedScheduling; - } + // -------------------------------------------------------------------------------------------- + // Actions + // -------------------------------------------------------------------------------------------- - public void setQueuedSchedulingAllowed(boolean allowed) { - this.allowQueuedScheduling = allowed; - } + public void attachJobGraph(List topologiallySorted) throws JobException { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d " + + "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size())); + } - public void setScheduleMode(ScheduleMode scheduleMode) { - this.scheduleMode = scheduleMode; - } + final long createTimestamp = System.currentTimeMillis(); - public ScheduleMode getScheduleMode() { - return scheduleMode; - } + for (AbstractJobVertex jobVertex : topologiallySorted) { - // -------------------------------------------------------------------------------------------- - // Actions - // -------------------------------------------------------------------------------------------- + // create the execution job vertex and attach it to the graph + ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp); + ejv.connectToPredecessors(this.intermediateResults); + ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); + if (previousTask != null) { + throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", + jobVertex.getID(), ejv, previousTask)); + } + + for (IntermediateResult res : ejv.getProducedDataSets()) { + IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res); + if (previousDataSet != null) { + throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", + res.getId(), res, previousDataSet)); + } + } + + this.verticesInCreationOrder.add(ejv); + } + } + public void scheduleForExecution(Scheduler scheduler) throws JobException { if (scheduler == null) { throw new IllegalArgumentException("Scheduler must not be null."); @@ -431,32 +500,24 @@ public class ExecutionGraph implements Serializable { switch (scheduleMode) { case FROM_SOURCES: - // initially, we simply take the ones without inputs. - // next, we implement the logic to go back from vertices that need computation - // to the ones we need to start running + // simply take the vertices without inputs. for (ExecutionJobVertex ejv : this.tasks.values()) { if (ejv.getJobVertex().isInputVertex()) { ejv.scheduleAll(scheduler, allowQueuedScheduling); } } - break; case ALL: for (ExecutionJobVertex ejv : getVerticesTopologically()) { ejv.scheduleAll(scheduler, allowQueuedScheduling); } - break; case BACKTRACKING: + // go back from vertices that need computation to the ones we need to run throw new JobException("BACKTRACKING is currently not supported as schedule mode."); } - - if (checkpointingEnabled) { - stateCheckpointerActor = StreamCheckpointCoordinator.spawn(parentContext, this, - Duration.create(checkpointingInterval, TimeUnit.MILLISECONDS)); - } } else { throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); @@ -508,6 +569,83 @@ public class ExecutionGraph implements Serializable { } } + public void restart() { + try { + if (state == JobStatus.FAILED) { + if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) { + throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart."); + } + } + + synchronized (progressLock) { + if (state != JobStatus.RESTARTING) { + throw new IllegalStateException("Can only restart job from state restarting."); + } + if (scheduler == null) { + throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null."); + } + + this.currentExecutions.clear(); + + for (ExecutionJobVertex jv : this.verticesInCreationOrder) { + jv.resetForNewExecution(); + } + + for (int i = 0; i < stateTimestamps.length; i++) { + stateTimestamps[i] = 0; + } + nextVertexToFinish = 0; + transitionState(JobStatus.RESTARTING, JobStatus.CREATED); + + // if we have checkpointed state, reload it into the executions + if (checkpointCoordinator != null) { + checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false); + } + } + + scheduleForExecution(scheduler); + } + catch (Throwable t) { + fail(t); + } + } + + /** + * This method cleans fields that are irrelevant for the archived execution attempt. + */ + public void prepareForArchiving() { + if (!state.isTerminalState()) { + throw new IllegalStateException("Can only archive the job from a terminal state"); + } + + // clear the non-serializable fields + userClassLoader = null; + scheduler = null; + checkpointCoordinator = null; + + for (ExecutionJobVertex vertex : verticesInCreationOrder) { + vertex.prepareForArchiving(); + } + + intermediateResults.clear(); + currentExecutions.clear(); + requiredJarFiles.clear(); + jobStatusListenerActors.clear(); + executionListenerActors.clear(); + } + + /** + * For testing: This waits until the job execution has finished. + * @throws InterruptedException + */ + public void waitUntilFinished() throws InterruptedException { + synchronized (progressLock) { + while (nextVertexToFinish < verticesInCreationOrder.size()) { + progressLock.wait(); + } + } + } + private boolean transitionState(JobStatus current, JobStatus newState) { return transitionState(current, newState, null); } @@ -551,24 +689,32 @@ public class ExecutionGraph implements Serializable { if (nextPos == verticesInCreationOrder.size()) { // we are done, transition to the final state - + JobStatus current; while (true) { - JobStatus current = this.state; - if (current == JobStatus.RUNNING && transitionState(current, JobStatus.FINISHED)) { - break; + current = this.state; + + if (current == JobStatus.RUNNING) { + if (transitionState(current, JobStatus.FINISHED)) { + postRunCleanup(); + break; + } } - if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) { - break; + else if (current == JobStatus.CANCELLING) { + if (transitionState(current, JobStatus.CANCELED)) { + postRunCleanup(); + break; + } } - if (current == JobStatus.FAILING) { + else if (current == JobStatus.FAILING) { if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) { numberOfRetriesLeft--; future(new Callable() { @Override public Object call() throws Exception { - try{ + try { Thread.sleep(delayBeforeRetrying); - }catch(InterruptedException e){ + } + catch(InterruptedException e){ // should only happen on shutdown } restart(); @@ -578,13 +724,15 @@ public class ExecutionGraph implements Serializable { break; } else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { + postRunCleanup(); break; } } - if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) { + else { fail(new Exception("ExecutionGraph went into final state from state " + current)); } } + // done transitioning the state // also, notify waiters progressLock.notifyAll(); @@ -592,6 +740,19 @@ public class ExecutionGraph implements Serializable { } } } + + private void postRunCleanup() { + try { + CheckpointCoordinator coord = this.checkpointCoordinator; + this.checkpointCoordinator = null; + if (coord != null) { + coord.shutdown(); + } + } + catch (Exception e) { + LOG.error("Error while cleaning up after execution", e); + } + } // -------------------------------------------------------------------------------------------- // Callbacks and Callback Utilities @@ -623,13 +784,6 @@ public class ExecutionGraph implements Serializable { return false; } } - - public void loadOperatorStates(Map , StateHandle> states) { - synchronized (this.progressLock) { - for (Map.Entry, StateHandle> state : states.entrySet()) - tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); - } - } public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { @@ -670,21 +824,19 @@ public class ExecutionGraph implements Serializable { // Listeners & Observers // -------------------------------------------------------------------------------------------- - public void registerJobStatusListener(ActorRef listener){ - this.jobStatusListenerActors.add(listener); - } - - public void registerExecutionListener(ActorRef listener){ - this.executionListenerActors.add(listener); + public void registerJobStatusListener(ActorRef listener) { + if (listener != null) { + this.jobStatusListenerActors.add(listener); + } } - public boolean containsJobStatusListener(ActorRef listener) { - return this.jobStatusListenerActors.contains(listener); + public void registerExecutionListener(ActorRef listener) { + if (listener != null) { + this.executionListenerActors.add(listener); + } } - - /** - * NOTE: This method never throws an error, only logs errors caused by the notified listeners. - */ + + private void notifyJobStatusChange(JobStatus newState, Throwable error) { if (jobStatusListenerActors.size() > 0) { ExecutionGraphMessages.JobStatusChanged message = @@ -695,10 +847,7 @@ public class ExecutionGraph implements Serializable { } } } - - /** - * NOTE: This method never throws an error, only logs errors caused by the notified listeners. - */ + void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState newExecutionState, Throwable error) { @@ -722,65 +871,4 @@ public class ExecutionGraph implements Serializable { fail(error); } } - - public void restart() { - try { - if (state == JobStatus.FAILED) { - if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) { - throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart."); - } - } - - synchronized (progressLock) { - if (state != JobStatus.RESTARTING) { - throw new IllegalStateException("Can only restart job from state restarting."); - } - if (scheduler == null) { - throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null."); - } - - this.currentExecutions.clear(); - - for (ExecutionJobVertex jv : this.verticesInCreationOrder) { - jv.resetForNewExecution(); - } - - for (int i = 0; i < stateTimestamps.length; i++) { - stateTimestamps[i] = 0; - } - nextVertexToFinish = 0; - transitionState(JobStatus.RESTARTING, JobStatus.CREATED); - } - - scheduleForExecution(scheduler); - } - catch (Throwable t) { - fail(t); - } - } - - /** - * This method cleans fields that are irrelevant for the archived execution attempt. - */ - public void prepareForArchiving() { - if (!state.isTerminalState()) { - throw new IllegalStateException("Can only archive the job from a terminal state"); - } - - userClassLoader = null; - - for (ExecutionJobVertex vertex : verticesInCreationOrder) { - vertex.prepareForArchiving(); - } - - intermediateResults.clear(); - currentExecutions.clear(); - requiredJarFiles.clear(); - jobStatusListenerActors.clear(); - executionListenerActors.clear(); - - scheduler = null; - parentContext = null; - stateCheckpointerActor = null; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index acbc17a..59b3bb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.util.SerializableObject; import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -52,7 +53,7 @@ public class ExecutionJobVertex implements Serializable { /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; - private final Object stateMonitor = new Object(); + private final SerializableObject stateMonitor = new SerializableObject(); private final ExecutionGraph graph; http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a44fc6f..2ad3a55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import akka.actor.ActorRef; + import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; @@ -41,8 +42,11 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; + import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.SerializedValue; import org.slf4j.Logger; + import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -95,8 +99,6 @@ public class ExecutionVertex implements Serializable { private volatile boolean scheduleLocalOnly; - private StateHandle operatorState; - // -------------------------------------------------------------------------------------------- public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, @@ -212,14 +214,6 @@ public class ExecutionVertex implements Serializable { public InstanceConnectionInfo getCurrentAssignedResourceLocation() { return currentExecution.getAssignedResourceLocation(); } - - public void setOperatorState(StateHandle operatorState) { - this.operatorState = operatorState; - } - - public StateHandle getOperatorState() { - return operatorState; - } public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); @@ -421,11 +415,6 @@ public class ExecutionVertex implements Serializable { if (grp != null) { this.locationConstraint = grp.getLocationConstraint(subTaskIndex); } - - if (operatorState != null) { - execution.setOperatorState(operatorState); - } - } else { throw new IllegalStateException("Cannot reset a vertex that is in state " + state); @@ -524,6 +513,7 @@ public class ExecutionVertex implements Serializable { // clear the unnecessary fields in this class this.resultPartitions = null; this.inputEdges = null; + this.locationConstraint = null; this.locationConstraintInstances = null; } @@ -588,10 +578,13 @@ public class ExecutionVertex implements Serializable { /** * Creates a task deployment descriptor to deploy a subtask to the given target slot. + * + * TODO: This should actually be in the EXECUTION */ TaskDeploymentDescriptor createDeploymentDescriptor( ExecutionAttemptID executionId, - SimpleSlot targetSlot) { + SimpleSlot targetSlot, + SerializedValue> operatorState) { // Produced intermediate results List producedPartitions = new ArrayList(resultPartitions.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 6d895f9..28fa78e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -38,9 +38,18 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; /** - * A job graph represents an entire Flink runtime job. + * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts. + * All programs from higher level APIs are transformed into JobGraphs. + * + *

The JobGraph is a graph of vertices and intermediate results that are connected together to + * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph + * but inside certain special vertices that establish the feedback channel amongst themselves.

+ * + *

The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result + * define the characteristics of the concrete operation and intermediate data.

*/ public class JobGraph implements Serializable { @@ -74,11 +83,12 @@ public class JobGraph implements Serializable { /** flag to enable queued scheduling */ private boolean allowQueuedScheduling; + /** The mode in which the job is scheduled */ private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; - private boolean checkpointingEnabled = false; + /** The settings for asynchronous snapshotting */ + private JobSnapshottingSettings snapshotSettings; - private long checkpointingInterval = 10000; // -------------------------------------------------------------------------------------------- @@ -258,20 +268,24 @@ public class JobGraph implements Serializable { return this.taskVertices.size(); } - public void setCheckpointingEnabled(boolean checkpointingEnabled) { - this.checkpointingEnabled = checkpointingEnabled; - } - - public boolean isCheckpointingEnabled() { - return checkpointingEnabled; - } - - public void setCheckpointingInterval(long checkpointingInterval) { - this.checkpointingInterval = checkpointingInterval; + /** + * Sets the settings for asynchronous snapshots. A value of {@code null} means that + * snapshotting is not enabled. + * + * @param settings The snapshot settings, or null, to disable snapshotting. + */ + public void setSnapshotSettings(JobSnapshottingSettings settings) { + this.snapshotSettings = settings; } - public long getCheckpointingInterval() { - return checkpointingInterval; + /** + * Gets the settings for asynchronous snapshots. This method returns null, when + * snapshotting is not enabled. + * + * @return The snapshot settings, or null, if snapshotting is not enabled. + */ + public JobSnapshottingSettings getSnapshotSettings() { + return snapshotSettings; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index 445c842..1cf5db2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -40,7 +40,7 @@ public abstract class AbstractInvokable { /** The environment assigned to this invokable. */ - private volatile Environment environment; + private Environment environment; /** The execution config, cached from the deserialization from the JobConfiguration */ private ExecutionConfig executionConfig; @@ -66,14 +66,14 @@ public abstract class AbstractInvokable { * @param environment * the environment of this task */ - public final void setEnvironment(final Environment environment) { + public final void setEnvironment(Environment environment) { this.environment = environment; } /** * Returns the environment of this task. * - * @return the environment of this task or null if the environment has not yet been set + * @return The environment of this task. */ public Environment getEnvironment() { return this.environment; http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java new file mode 100644 index 0000000..69cb1f8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +public interface CheckpointCommittingOperator { + + void confirmCheckpoint(long checkpointId, long timestamp); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java new file mode 100644 index 0000000..d07b07e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +public interface CheckpointedOperator { + + void triggerCheckpoint(long checkpointId, long timestamp); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java new file mode 100644 index 0000000..86c9b60 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.List; + +/** + * The JobSnapshottingSettings are attached to a JobGraph and describe the settings + * for the asynchronous snapshotting of the JobGraph, such as interval, and which vertices + * need to participate. + */ +public class JobSnapshottingSettings implements java.io.Serializable{ + + private static final long serialVersionUID = -2593319571078198180L; + + /** The default time in which pending checkpoints need to be acknowledged before timing out */ + public static final long DEFAULT_SNAPSHOT_TIMEOUT = 10 * 60 * 1000; // 10 minutes + + private final List verticesToTrigger; + + private final List verticesToAcknowledge; + + private final List verticesToConfirm; + + private final long checkpointInterval; + + private final long checkpointTimeout; + + + public JobSnapshottingSettings(List verticesToTrigger, + List verticesToAcknowledge, + List verticesToConfirm, + long checkpointInterval) + { + this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm, checkpointInterval, DEFAULT_SNAPSHOT_TIMEOUT); + } + + public JobSnapshottingSettings(List verticesToTrigger, + List verticesToAcknowledge, + List verticesToConfirm, + long checkpointInterval, long checkpointTimeout) + { + this.verticesToTrigger = verticesToTrigger; + this.verticesToAcknowledge = verticesToAcknowledge; + this.verticesToConfirm = verticesToConfirm; + this.checkpointInterval = checkpointInterval; + this.checkpointTimeout = checkpointTimeout; + } + + // -------------------------------------------------------------------------------------------- + + public List getVerticesToTrigger() { + return verticesToTrigger; + } + + public List getVerticesToAcknowledge() { + return verticesToAcknowledge; + } + + public List getVerticesToConfirm() { + return verticesToConfirm; + } + + public long getCheckpointInterval() { + return checkpointInterval; + } + + public long getCheckpointTimeout() { + return checkpointTimeout; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String toString() { + return String.format("SnapshotSettings: interval=%d, timeout=%d, trigger=%s, ack=%s, commit=%s", + checkpointInterval, checkpointTimeout, verticesToTrigger, verticesToAcknowledge, verticesToConfirm); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java index 670dc3f..576edb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java @@ -21,12 +21,18 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.runtime.state.StateHandle; /** - * This is an interface meant to be implemented by any invokable that has to support state recovery. - * It is mainly used by the TaskManager to identify operators that support state recovery in order - * to inject their initial state upon creation. + * This interface must be implemented by any invokable that has recoverable state. + * The method {@link #setInitialState(org.apache.flink.runtime.state.StateHandle)} is used + * to set the initial state of the operator, upon recovery. */ -public interface OperatorStateCarrier { +public interface OperatorStateCarrier> { - public void injectState(StateHandle stateHandle); + /** + * Sets the initial state of the operator, upon recovery. The initial state is typically + * a snapshot of the state from a previous execution. + * + * @param stateHandle The handle to the state. + */ + public void setInitialState(T stateHandle); } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java deleted file mode 100644 index 0493ba6..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.messages.checkpoint; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; - -/** - * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the - * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint - * has been confirmed and that the task can commit the checkpoint to the outside world. - */ -public class AbortCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable { - - private static final long serialVersionUID = 2094094662279578953L; - - public AbortCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) { - super(job, taskExecutionId, checkpointId); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean equals(Object o) { - return this == o || ( (o instanceof AbortCheckpoint) && super.equals(o)); - } - - @Override - public String toString() { - return String.format("AbortCheckpoint %d for (%s/%s)", - getCheckpointId(), getJob(), getTaskExecutionId()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java index dd94e37..db12e0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.messages.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.SerializedValue; /** * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the @@ -33,18 +34,19 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements private static final long serialVersionUID = -7606214777192401493L; - private final StateHandle state; + private final SerializedValue> state; public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) { this(job, taskExecutionId, checkpointId, null); } - public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, StateHandle state) { + public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, + SerializedValue> state) { super(job, taskExecutionId, checkpointId); this.state = state; } - public StateHandle getState() { + public SerializedValue> getState() { return state; } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java index cdfd202..d3a4374 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java @@ -30,15 +30,39 @@ public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java private static final long serialVersionUID = 2094094662279578953L; - public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) { + /** The timestamp associated with the checkpoint */ + private final long timestamp; + + public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) { super(job, taskExecutionId, checkpointId); + this.timestamp = timestamp; } // -------------------------------------------------------------------------------------------- + public long getTimestamp() { + return timestamp; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32)); + } + @Override public boolean equals(Object o) { - return this == o || ( (o instanceof ConfirmCheckpoint) && super.equals(o)); + if (this == o) { + return true; + } + else if (o instanceof ConfirmCheckpoint) { + ConfirmCheckpoint that = (ConfirmCheckpoint) o; + return this.timestamp == that.timestamp && super.equals(o); + } + else { + return false; + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java index 98712b5..f47b054 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java @@ -18,36 +18,23 @@ package org.apache.flink.runtime.state; -import org.apache.flink.util.InstantiationUtil; - -import java.io.IOException; import java.util.Map; /** - * A StateHandle that includes a copy of the state itself. This state handle is recommended for - * cases where the operatorState is lightweight enough to pass throughout the network. - * - * State is kept in a byte[] because it may contain userclasses, which akka is not able to handle. + * A StateHandle that includes a map of operator states directly. */ -public class LocalStateHandle implements StateHandle{ +public class LocalStateHandle implements StateHandle>> { + + private static final long serialVersionUID = 2093619217898039610L; - transient private Map> stateMap; - private final byte[] state; + private final Map> stateMap; - public LocalStateHandle(Map> state) throws IOException { + public LocalStateHandle(Map> state) { this.stateMap = state; - this.state = InstantiationUtil.serializeObject(state); } @Override - public Map> getState(ClassLoader usercodeClassloader) { - if(stateMap == null) { - try { - stateMap = (Map>) InstantiationUtil.deserializeObject(this.state, usercodeClassloader); - } catch (Exception e) { - throw new RuntimeException("Error while deserializing the state", e); - } - } + public Map> getState() { return stateMap; } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java index 1852ce8..409383c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java @@ -19,20 +19,19 @@ package org.apache.flink.runtime.state; import java.io.Serializable; -import java.util.Map; /** * StateHandle is a general handle interface meant to abstract operator state fetching. * A StateHandle implementation can for example include the state itself in cases where the state * is lightweight or fetching it lazily from some external storage when the state is too large. */ -public interface StateHandle extends Serializable { +public interface StateHandle extends Serializable { /** - * getState should retrieve and return the state managed the handle. + * This retrieves and return the state represented by the handle. * - * @return + * @return The state represented by the handle. + * @throws java.lang.Exception Thrown, if the state cannot be fetched. */ - public Map> getState(ClassLoader userClassloader); - + T getState() throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java new file mode 100644 index 0000000..2cdfef3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; + +/** + * A collection of utility methods for dealing with operator state. + */ +public class StateUtils { + + /** + * Utility method to define a common generic bound to be used for setting a generic state + * handle on a generic state carrier. + * + * This has no impact on runtime, since internally, it performs + * unchecked casts. The purpose is merely to allow the use of generic interfaces without resorting + * to raw types, by giving the compiler a common type bound. + * + * @param op The state carrier operator. + * @param state The state handle. + * @param Type bound for the + */ + public static > void setOperatorState(OperatorStateCarrier op, StateHandle state) { + @SuppressWarnings("unchecked") + OperatorStateCarrier typedOp = (OperatorStateCarrier) op; + @SuppressWarnings("unchecked") + T typedHandle = (T) state; + + typedOp.setInitialState(typedHandle); + } + + + // ------------------------------------------------------------------------ + + /** Do not instantiate */ + private StateUtils() {} +}