Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4A3DE18E73 for ; Tue, 12 May 2015 21:03:13 +0000 (UTC) Received: (qmail 86867 invoked by uid 500); 12 May 2015 21:03:13 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 86764 invoked by uid 500); 12 May 2015 21:03:13 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 86623 invoked by uid 99); 12 May 2015 21:03:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 May 2015 21:03:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE896E17E2; Tue, 12 May 2015 21:03:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Tue, 12 May 2015 21:03:16 -0000 Message-Id: <44f795a8a5d14a2781a07b21cf451009@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/10] flink git commit: [streaming] Integrate new checkpointed interface with StreamTask, StreamOperator, and PersistentKafkaSource [streaming] Integrate new checkpointed interface with StreamTask, StreamOperator, and PersistentKafkaSource Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b2ee23b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b2ee23b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b2ee23b Branch: refs/heads/master Commit: 3b2ee23b65544a5f517e77cb55911bacae4117c6 Parents: ededb6b Author: Stephan Ewen Authored: Tue May 5 16:08:50 2015 +0200 Committer: Stephan Ewen Committed: Tue May 12 21:35:58 2015 +0200 ---------------------------------------------------------------------- .../jobgraph/tasks/BarrierTransceiver.java | 43 ------ .../tasks/CheckpointCommittingOperator.java | 2 +- .../jobgraph/tasks/CheckpointedOperator.java | 2 +- .../jobgraph/tasks/OperatorStateCarrier.java | 2 +- .../flink/runtime/state/LocalStateHandle.java | 14 +- .../apache/flink/runtime/state/StateUtils.java | 4 +- .../apache/flink/runtime/taskmanager/Task.java | 19 +-- .../checkpoint/CheckpointStateRestoreTest.java | 7 +- .../kafka/api/simple/PersistentKafkaSource.java | 27 ++-- .../streaming/api/checkpoint/Checkpointed.java | 20 ++- .../streaming/api/operators/StreamOperator.java | 53 ++++++- .../streaming/runtime/tasks/OutputHandler.java | 4 +- .../streaming/runtime/tasks/StreamTask.java | 150 ++++++++++--------- .../runtime/tasks/StreamingRuntimeContext.java | 60 +------- .../runtime/tasks/StreamingSuperstep.java | 37 ++++- .../streaming/runtime/io/BarrierBufferTest.java | 4 +- .../ProcessFailureStreamingRecoveryITCase.java | 31 ++-- 17 files changed, 229 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java deleted file mode 100644 index 1dd6a90..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobgraph.tasks; - - -import java.io.IOException; - -/** - * A BarrierTransceiver describes an operator's barrier checkpointing behavior used for - * fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called - * periodically upon receiving a checkpoint barrier. Furthermore, a [[confirmBarrier]] method should - * be implemented and used for acknowledging a specific checkpoint checkpoint. - */ -public interface BarrierTransceiver { - - /** - * A callback for notifying an operator of a new checkpoint barrier. - * @param barrierID - */ - public void broadcastBarrierFromSource(long barrierID); - - /** - * A callback for confirming that a barrier checkpoint is complete - * @param barrierID - */ - public void confirmBarrier(long barrierID) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java index 69cb1f8..be203d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java @@ -20,5 +20,5 @@ package org.apache.flink.runtime.jobgraph.tasks; public interface CheckpointCommittingOperator { - void confirmCheckpoint(long checkpointId, long timestamp); + void confirmCheckpoint(long checkpointId, long timestamp) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java index d07b07e..17ba947 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java @@ -20,5 +20,5 @@ package org.apache.flink.runtime.jobgraph.tasks; public interface CheckpointedOperator { - void triggerCheckpoint(long checkpointId, long timestamp); + void triggerCheckpoint(long checkpointId, long timestamp) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java index 576edb6..fb5e63f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java @@ -33,6 +33,6 @@ public interface OperatorStateCarrier> { * * @param stateHandle The handle to the state. */ - public void setInitialState(T stateHandle); + public void setInitialState(T stateHandle) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java index f47b054..fa0c515 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java @@ -18,23 +18,23 @@ package org.apache.flink.runtime.state; -import java.util.Map; +import java.io.Serializable; /** * A StateHandle that includes a map of operator states directly. */ -public class LocalStateHandle implements StateHandle>> { +public class LocalStateHandle implements StateHandle { private static final long serialVersionUID = 2093619217898039610L; - private final Map> stateMap; + private final Serializable state; - public LocalStateHandle(Map> state) { - this.stateMap = state; + public LocalStateHandle(Serializable state) { + this.state = state; } @Override - public Map> getState() { - return stateMap; + public Serializable getState() { + return state; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java index 2cdfef3..fbd76ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java @@ -37,7 +37,9 @@ public class StateUtils { * @param state The state handle. * @param Type bound for the */ - public static > void setOperatorState(OperatorStateCarrier op, StateHandle state) { + public static > void setOperatorState(OperatorStateCarrier op, StateHandle state) + throws Exception + { @SuppressWarnings("unchecked") OperatorStateCarrier typedOp = (OperatorStateCarrier) op; @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 1578e4b..8630edf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator; import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; @@ -57,6 +56,7 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateUtils; import org.apache.flink.runtime.util.SerializedValue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -872,23 +872,6 @@ public class Task implements Runnable { }; executeAsyncCallRunnable(runnable, "Checkpoint Trigger"); } - else if (invokable instanceof BarrierTransceiver) { - final BarrierTransceiver barrierTransceiver = (BarrierTransceiver) invokable; - final Logger logger = LOG; - - Runnable runnable = new Runnable() { - @Override - public void run() { - try { - barrierTransceiver.broadcastBarrierFromSource(checkpointID); - } - catch (Throwable t) { - logger.error("Error while triggering checkpoint barriers", t); - } - } - }; - executeAsyncCallRunnable(runnable, "Checkpoint Trigger"); - } else { LOG.error("Task received a checkpoint request, but is not a checkpointing task - " + taskNameWithSubtask); http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 51c4890..e1cf061 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -27,15 +27,14 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.LocalStateHandle; -import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedValue; import org.junit.Test; import org.mockito.Mockito; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -52,7 +51,7 @@ public class CheckpointStateRestoreTest { public void testSetState() { try { final SerializedValue> serializedState = new SerializedValue>( - new LocalStateHandle(Collections.>emptyMap())); + new LocalStateHandle(new SerializableObject())); final JobID jid = new JobID(); final JobVertexID statefulId = new JobVertexID(); @@ -121,7 +120,7 @@ public class CheckpointStateRestoreTest { public void testStateOnlyPartiallyAvailable() { try { final SerializedValue> serializedState = new SerializedValue>( - new LocalStateHandle(Collections.>emptyMap())); + new LocalStateHandle(new SerializableObject())); final JobID jid = new JobID(); final JobVertexID statefulId = new JobVertexID(); http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java index 2af8fb2..a842160 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; import kafka.consumer.ConsumerConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.ConnectorSource; import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator; @@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory; * @param * Type of the messages on the topic. */ -public class PersistentKafkaSource extends ConnectorSource { +public class PersistentKafkaSource extends ConnectorSource implements Checkpointed> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class); @@ -202,13 +203,13 @@ public class PersistentKafkaSource extends ConnectorSource { if (indexOfSubtask >= numberOfPartitions) { LOG.info("Creating idle consumer because this subtask ({}) is higher than the number partitions ({})", indexOfSubtask + 1, numberOfPartitions); iterator = new KafkaIdleConsumerIterator(); - } else { - if (context.containsState("kafka")) { + } + else { + if (partitionOffsets != null) { + // we have restored state LOG.info("Initializing PersistentKafkaSource from existing state."); - kafkaOffSetOperatorState = (OperatorState>) context.getState("kafka"); - - partitionOffsets = kafkaOffSetOperatorState.getState(); - } else { + } + else { LOG.info("No existing state found. Creating new"); partitionOffsets = new HashMap(); @@ -217,8 +218,6 @@ public class PersistentKafkaSource extends ConnectorSource { } kafkaOffSetOperatorState = new OperatorState>(partitionOffsets); - - context.registerState("kafka", kafkaOffSetOperatorState); } iterator = new KafkaMultiplePartitionsIterator(topicId, partitionOffsets, kafkaTopicUtils, this.consumerConfig); @@ -272,4 +271,14 @@ public class PersistentKafkaSource extends ConnectorSource { Properties props = (Properties) in.readObject(); consumerConfig = new ConsumerConfig(props); } + + @Override + public HashMap snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return new HashMap(this.partitionOffsets); + } + + @Override + public void restoreState(HashMap state) { + this.partitionOffsets = state; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java index f491dd3..2cab7a3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.checkpoint; -import org.apache.flink.runtime.state.OperatorState; +import java.io.Serializable; /** * This method must be implemented by functions that have state that needs to be @@ -31,12 +31,14 @@ import org.apache.flink.runtime.state.OperatorState; * continue to work and mutate the state, even while the state snapshot is being accessed, * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously} * interface.

+ * + * @param The type of the operator state. */ -public interface Checkpointed { +public interface Checkpointed { /** - * Gets the current operator state as a checkpoint. The state must reflect all operations - * from all prior operations if this function. + * Gets the current state of the function of operator. The state must reflect the result of all + * prior invocations to this function. * * @param checkpointId The ID of the checkpoint. * @param checkpointTimestamp The timestamp of the checkpoint, as derived by @@ -49,5 +51,13 @@ public interface Checkpointed { * recovery), or to discard this checkpoint attempt and to continue running * and to try again with the next checkpoint attempt. */ - OperatorState snapshotState(long checkpointId, long checkpointTimestamp) throws Exception; + T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception; + + /** + * Restores the state of the function or operator to that of a previous checkpoint. + * This method is invoked when a function is executed as part of a recovery run. + * * + * @param state The state to be restored. + */ + void restoreState(T state); } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index a361b7a..f66e394 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -26,6 +26,8 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.runtime.io.IndexedReaderIterator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; @@ -107,16 +109,14 @@ public abstract class StreamOperator implements Serializable { return nextRecord; } catch (IOException e) { if (isRunning) { - throw new RuntimeException("Could not read next record due to: " - + StringUtils.stringifyException(e)); + throw new RuntimeException("Could not read next record", e); } else { // Task already cancelled do nothing return null; } } catch (IllegalStateException e) { if (isRunning) { - throw new RuntimeException("Could not read next record due to: " - + StringUtils.stringifyException(e)); + throw new RuntimeException("Could not read next record", e); } else { // Task already cancelled do nothing return null; @@ -215,4 +215,49 @@ public abstract class StreamOperator implements Serializable { public Function getUserFunction() { return userFunction; } + + // ------------------------------------------------------------------------ + // Checkpoints and Checkpoint Confirmations + // ------------------------------------------------------------------------ + + // NOTE - ALL OF THIS CODE WORKS ONLY FOR THE FIRST OPERATOR IN THE CHAIN + // IT NEEDS TO BE EXTENDED TO SUPPORT CHAINS + + public void restoreInitialState(Serializable state) throws Exception { + if (userFunction instanceof Checkpointed) { + setStateOnFunction(state, userFunction); + } + else { + throw new IllegalStateException("Trying to restore state of a non-checkpointed function"); + } + } + + public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception { + if (userFunction instanceof Checkpointed) { + return ((Checkpointed) userFunction).snapshotState(checkpointId, timestamp); + } + else { + return null; + } + } + + public void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception { + if (userFunction instanceof CheckpointCommitter) { + try { + ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId); + } + catch (Exception e) { + throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e); + } + } + } + + private static void setStateOnFunction(Serializable state, Function function) { + @SuppressWarnings("unchecked") + T typedState = (T) state; + @SuppressWarnings("unchecked") + Checkpointed typedFunction = (Checkpointed) function; + + typedFunction.restoreState(typedState); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java index c579c3a..55c0551 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java @@ -88,8 +88,8 @@ public class OutputHandler { this.outerCollector = createChainedCollector(configuration); } - public void broadcastBarrier(long id) throws IOException, InterruptedException { - StreamingSuperstep barrier = new StreamingSuperstep(id); + public void broadcastBarrier(long id, long timestamp) throws IOException, InterruptedException { + StreamingSuperstep barrier = new StreamingSuperstep(id, timestamp); for (StreamOutput streamOutput : outputMap.values()) { streamOutput.broadcastEvent(barrier); } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index b4a11aa..930c9b4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -18,18 +18,15 @@ package org.apache.flink.streaming.runtime.tasks; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.io.Serializable; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator; import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; import org.apache.flink.runtime.state.LocalStateHandle; -import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.ChainableStreamOperator; @@ -40,16 +37,18 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StreamTask extends AbstractInvokable implements StreamTaskContext, - OperatorStateCarrier, CheckpointedOperator, CheckpointCommittingOperator, - BarrierTransceiver { + OperatorStateCarrier, CheckpointedOperator, CheckpointCommittingOperator { private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); + private final Object checkpointLock = new Object(); + private static int numTasks; protected StreamConfig configuration; @@ -62,7 +61,6 @@ public class StreamTask extends AbstractInvokable implements StreamTask protected volatile boolean isRunning = false; private StreamingRuntimeContext context; - private Map> states; protected ClassLoader userClassLoader; @@ -90,32 +88,7 @@ public class StreamTask extends AbstractInvokable implements StreamTask protected void initialize() { this.userClassLoader = getUserCodeClassLoader(); this.configuration = new StreamConfig(getTaskConfiguration()); - this.states = new HashMap>(); - this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); - } - - @Override - public void broadcastBarrierFromSource(long id) { - // Only called at input vertices - if (LOG.isDebugEnabled()) { - LOG.debug("Received barrier from jobmanager: " + id); - } - actOnBarrier(id); - } - - /** - * This method is called to confirm that a barrier has been fully processed. - * It sends an acknowledgment to the jobmanager. In the current version if - * there is user state it also checkpoints the state to the jobmanager. - */ - @Override - public void confirmBarrier(long barrierID) throws IOException { - if (configuration.getStateMonitoring() && !states.isEmpty()) { - getEnvironment().acknowledgeCheckpoint(barrierID, new LocalStateHandle(states)); - } - else { - getEnvironment().acknowledgeCheckpoint(barrierID); - } + this.context = createRuntimeContext(getEnvironment().getTaskName()); } public void setInputsOutputs() { @@ -136,11 +109,10 @@ public class StreamTask extends AbstractInvokable implements StreamTask return instanceID; } - public StreamingRuntimeContext createRuntimeContext(String taskName, - Map> states) { + public StreamingRuntimeContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), - getExecutionConfig(), states); + getExecutionConfig()); } @Override @@ -272,62 +244,98 @@ public class StreamTask extends AbstractInvokable implements StreamTask return this.superstepListener; } + // ------------------------------------------------------------------------ + // Checkpoint and Restore + // ------------------------------------------------------------------------ + /** - * Method to be called when a barrier is received from all the input - * channels. It should broadcast the barrier to the output operators, - * checkpoint the state and send an ack. - * - * @param id + * Re-injects the user states into the map. Also set the state on the functions. */ - private synchronized void actOnBarrier(long id) { - if (isRunning) { - try { - outputHandler.broadcastBarrier(id); - confirmBarrier(id); - if (LOG.isDebugEnabled()) { - LOG.debug("Superstep " + id + " processed: " + StreamTask.this); - } - } catch (Exception e) { - // Only throw any exception if the vertex is still running - if (isRunning) { - throw new RuntimeException(e); - } - } - } - } - @Override - public String toString() { - return getEnvironment().getTaskNameWithSubtasks(); + public void setInitialState(LocalStateHandle stateHandle) throws Exception { + // here, we later resolve the state handle into the actual state by + // loading the state described by the handle from the backup store + Serializable state = stateHandle.getState(); + streamOperator.restoreInitialState(state); } /** - * Re-injects the user states into the map + * This method is either called directly by the checkpoint coordinator, or called + * when all incoming channels have reported a barrier + * + * @param checkpointId + * @param timestamp + * @throws Exception */ @Override - public void setInitialState(LocalStateHandle stateHandle) { - this.states.putAll(stateHandle.getState()); + public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { + + synchronized (checkpointLock) { + if (isRunning) { + try { + LOG.info("Starting checkpoint " + checkpointId); + + // first draw the state that should go into checkpoint + LocalStateHandle state; + try { + Serializable userState = streamOperator.getStateSnapshotFromFunction(checkpointId, timestamp); + state = userState == null ? null : new LocalStateHandle(userState); + } + catch (Exception e) { + throw new Exception("Error while drawing snapshot of the user state."); + } + + // now emit the checkpoint barriers + outputHandler.broadcastBarrier(checkpointId, timestamp); + + // now confirm the checkpoint + if (state == null) { + getEnvironment().acknowledgeCheckpoint(checkpointId); + } else { + getEnvironment().acknowledgeCheckpoint(checkpointId, state); + } + } + catch (Exception e) { + if (isRunning) { + throw e; + } + } + } + } } @Override - public void triggerCheckpoint(long checkpointId, long timestamp) { - broadcastBarrierFromSource(checkpointId); + public void confirmCheckpoint(long checkpointId, long timestamp) throws Exception { + // we do nothing here so far. this should call commit on the source function, for example + synchronized (checkpointLock) { + streamOperator.confirmCheckpointCompleted(checkpointId, timestamp); + } } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + @Override - public void confirmCheckpoint(long checkpointId, long timestamp) { - // we do nothing here so far. this should call commit on the source function, for example + public String toString() { + return getEnvironment().getTaskNameWithSubtasks(); } - - + // ------------------------------------------------------------------------ private class SuperstepEventListener implements EventListener { @Override public void onEvent(TaskEvent event) { - actOnBarrier(((StreamingSuperstep) event).getId()); + try { + StreamingSuperstep sStep = (StreamingSuperstep) event; + triggerCheckpoint(sStep.getId(), sStep.getTimestamp()); + } + catch (Exception e) { + throw new RuntimeException( + "Error triggering a checkpoint as the result of receiving checkpoint barrier", e); + } } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java index 8ae2ced..6112e03 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java @@ -18,17 +18,13 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.Map; - import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.state.OperatorState; /** * Implementation of the {@link RuntimeContext}, created by runtime stream UDF @@ -37,65 +33,13 @@ import org.apache.flink.runtime.state.OperatorState; public class StreamingRuntimeContext extends RuntimeUDFContext { private final Environment env; - private final Map> operatorStates; + public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, Map> operatorStates) { + ExecutionConfig executionConfig) { super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, executionConfig, env.getDistributedCacheEntries()); this.env = env; - this.operatorStates = operatorStates; - } - - /** - * Returns the operator state registered by the given name for the operator. - * - * @param name - * Name of the operator state to be returned. - * @return The operator state. - */ - public OperatorState getState(String name) { - if (operatorStates == null) { - throw new RuntimeException("No state has been registered for this operator."); - } else { - OperatorState state = operatorStates.get(name); - if (state != null) { - return state; - } else { - throw new RuntimeException("No state has been registered for the name: " + name); - } - } - } - - /** - * Returns whether there is a state stored by the given name - */ - public boolean containsState(String name) { - return operatorStates.containsKey(name); - } - - /** - * This is a beta feature

Register an operator state for this - * operator by the given name. This name can be used to retrieve the state - * during runtime using {@link StreamingRuntimeContext#getState(String)}. To - * obtain the {@link StreamingRuntimeContext} from the user-defined function - * use the {@link RichFunction#getRuntimeContext()} method. - * - * @param name - * The name of the operator state. - * @param state - * The state to be registered for this name. - */ - public void registerState(String name, OperatorState state) { - if (state == null) { - throw new RuntimeException("Cannot register null state"); - } else { - if (operatorStates.containsKey(name)) { - throw new RuntimeException("State is already registered"); - } else { - operatorStates.put(name, state); - } - } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java index 4b3419b..f749773 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java @@ -27,34 +27,57 @@ import org.apache.flink.runtime.event.task.TaskEvent; public class StreamingSuperstep extends TaskEvent { protected long id; + protected long timestamp; - public StreamingSuperstep() { + public StreamingSuperstep() {} + public StreamingSuperstep(long id, long timestamp) { + this.id = id; + this.timestamp = timestamp; } - public StreamingSuperstep(long id) { - this.id = id; + public long getId() { + return id; } + public long getTimestamp() { + return id; + } + + // ------------------------------------------------------------------------ + @Override public void write(DataOutputView out) throws IOException { out.writeLong(id); + out.writeLong(timestamp); } @Override public void read(DataInputView in) throws IOException { id = in.readLong(); + timestamp = in.readLong(); } + + // ------------------------------------------------------------------------ - public long getId() { - return id; + @Override + public int hashCode() { + return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32)); } + @Override public boolean equals(Object other) { if (other == null || !(other instanceof StreamingSuperstep)) { return false; - } else { - return ((StreamingSuperstep) other).id == this.id; } + else { + StreamingSuperstep that = (StreamingSuperstep) other; + return that.id == this.id && that.timestamp == this.timestamp; + } + } + + @Override + public String toString() { + return String.format("StreamingSuperstep %d @ %d", id, timestamp); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index b8af4ed..0afe8b5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -33,8 +33,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.streaming.runtime.io.BarrierBuffer; import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep; + import org.junit.Test; public class BarrierBufferTest { @@ -201,7 +201,7 @@ public class BarrierBufferTest { } protected static BufferOrEvent createSuperstep(long id, int channel) { - return new BufferOrEvent(new StreamingSuperstep(id), channel); + return new BufferOrEvent(new StreamingSuperstep(id, System.currentTimeMillis()), channel); } protected static BufferOrEvent createBuffer(int channel) { http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java index aeaad5c..65e39a2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java @@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -145,12 +145,15 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur } } - public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction { + public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction + implements Checkpointed { private static final long SLEEP_TIME = 50; private final File coordinateDir; private final long end; + + private long collected; public SleepyDurableGenerateSequence(File coordinateDir, long end) { this.coordinateDir = coordinateDir; @@ -162,23 +165,10 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur public void run(Collector collector) throws Exception { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); - OperatorState collectedState; - if (context.containsState("collected")) { - collectedState = (OperatorState) context.getState("collected"); - -// if (collected == 0) { -// throw new RuntimeException("The state did not capture a completed checkpoint"); -// } - } - else { - collectedState = new OperatorState(0L); - context.registerState("collected", collectedState); - } final long stepSize = context.getNumberOfParallelSubtasks(); final long congruence = context.getIndexOfThisSubtask(); final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); - long collected = collectedState.getState(); final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); boolean checkForProceedFile = true; @@ -196,13 +186,22 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur } collector.collect(collected * stepSize + congruence); - collectedState.update(collected); collected++; } } @Override public void cancel() {} + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return collected; + } + + @Override + public void restoreState(Long state) { + collected = state; + } }