Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1C84C200B99 for ; Wed, 5 Oct 2016 22:29:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1B170160AC9; Wed, 5 Oct 2016 20:29:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4CBA2160ADE for ; Wed, 5 Oct 2016 22:29:01 +0200 (CEST) Received: (qmail 62795 invoked by uid 500); 5 Oct 2016 20:29:00 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 62701 invoked by uid 99); 5 Oct 2016 20:29:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Oct 2016 20:29:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03324E0209; Wed, 5 Oct 2016 20:29:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Wed, 05 Oct 2016 20:29:00 -0000 Message-Id: <35e1f99fd644408ea7c746a62846eebf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-4744] [streaming api] Followup: Unify names for operator state access methods and comments. archived-at: Wed, 05 Oct 2016 20:29:03 -0000 [FLINK-4744] [streaming api] Followup: Unify names for operator state access methods and comments. Also make JavaSerializer package private, as it is not intended for user as a proper TypeSerializer Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10a42f95 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10a42f95 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10a42f95 Branch: refs/heads/master Commit: 10a42f951c5143537c28a0f9df65627e5c632c4b Parents: 56cba7e Author: Stephan Ewen Authored: Wed Oct 5 15:30:26 2016 +0200 Committer: Stephan Ewen Committed: Wed Oct 5 19:36:13 2016 +0200 ---------------------------------------------------------------------- .../api/common/state/OperatorStateStore.java | 61 ++++++++++ .../java/typeutils/runtime/JavaSerializer.java | 119 ------------------ .../state/DefaultOperatorStateBackend.java | 8 +- .../flink/runtime/state/JavaSerializer.java | 122 +++++++++++++++++++ .../runtime/state/OperatorStateBackend.java | 2 + .../flink/runtime/state/OperatorStateStore.java | 60 --------- .../runtime/state/OperatorStateBackendTest.java | 15 ++- .../kafka/FlinkKafkaConsumerBase.java | 6 +- .../kafka/FlinkKafkaProducerBase.java | 2 +- .../kafka/AtLeastOnceProducerTest.java | 1 - .../kafka/FlinkKafkaConsumerBaseTest.java | 14 +-- .../api/checkpoint/CheckpointedFunction.java | 4 +- .../operators/AbstractUdfStreamOperator.java | 6 +- .../operators/StreamCheckpointedOperator.java | 5 +- .../streaming/runtime/tasks/StreamTask.java | 2 +- .../runtime/tasks/OneInputStreamTaskTest.java | 4 +- 16 files changed, 217 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java new file mode 100644 index 0000000..03c11f6 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import java.io.Serializable; +import java.util.Set; + +/** + * Interface for a backend that manages operator state. + */ +public interface OperatorStateStore { + + String DEFAULT_OPERATOR_STATE_NAME = "_default_"; + + /** + * Creates a state descriptor of the given name that uses Java serialization to persist the + * state. + * + *

This is a simple convenience method. For more flexibility on how state serialization + * should happen, use the {@link #getOperatorState(ListStateDescriptor)} method. + * + * @param stateName The name of state to create + * @return A list state using Java serialization to serialize state objects. + * @throws Exception + */ + ListState getSerializableListState(String stateName) throws Exception; + + /** + * Creates (or restores) a list state. Each state is registered under a unique name. + * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). + * + * @param stateDescriptor The descriptor for this state, providing a name and serializer. + * @param The generic type of the state + * + * @return A list for all state partitions. + * @throws Exception + */ + ListState getOperatorState(ListStateDescriptor stateDescriptor) throws Exception; + + /** + * Returns a set with the names of all currently registered states. + * @return set of names for all registered states. + */ + Set getRegisteredStateNames(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java deleted file mode 100644 index 3af7653..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.io.Serializable; - -public class JavaSerializer extends TypeSerializer { - - private static final long serialVersionUID = 1L; - - private final ClassLoader userClassLoader; - - public JavaSerializer() { - this(Thread.currentThread().getContextClassLoader()); - } - - public JavaSerializer(ClassLoader userClassLoader) { - this.userClassLoader = Preconditions.checkNotNull(userClassLoader); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public TypeSerializer duplicate() { - return this; - } - - @Override - public T createInstance() { - return null; - } - - @Override - public T copy(T from) { - - try { - return InstantiationUtil.clone(from); - } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException("Could not copy instance of " + from + '.', e); - } - } - - @Override - public T copy(T from, T reuse) { - return copy(from); - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(T record, DataOutputView target) throws IOException { - InstantiationUtil.serializeObject(new DataOutputViewStream(target), record); - } - - @Override - public T deserialize(DataInputView source) throws IOException { - try { - return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader); - } catch (ClassNotFoundException e) { - throw new IOException("Could not deserialize object.", e); - } - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - int size = source.readInt(); - target.writeInt(size); - target.write(source, size); - } - - @Override - public boolean equals(Object obj) { - return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer) obj).userClassLoader); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof JavaSerializer; - } - - @Override - public int hashCode() { - return getClass().hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index af97a3f..b1ab7e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -20,8 +20,8 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.JavaSerializer; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.memory.DataInputView; @@ -74,15 +74,15 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } @Override - public ListState getDefaultPartitionableState(String stateName) throws Exception { - return getPartitionableState(new ListStateDescriptor<>(stateName, javaSerializer)); + public ListState getSerializableListState(String stateName) throws Exception { + return getOperatorState(new ListStateDescriptor<>(stateName, javaSerializer)); } /** * @see OperatorStateStore */ @Override - public ListState getPartitionableState( + public ListState getOperatorState( ListStateDescriptor stateDescriptor) throws IOException { Preconditions.checkNotNull(stateDescriptor); http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java new file mode 100644 index 0000000..2eb9595 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.Serializable; + +@SuppressWarnings("serial") +@Internal +final class JavaSerializer extends TypeSerializer { + + private final ClassLoader userClassLoader; + + public JavaSerializer() { + this(Thread.currentThread().getContextClassLoader()); + } + + public JavaSerializer(ClassLoader userClassLoader) { + this.userClassLoader = Preconditions.checkNotNull(userClassLoader); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer duplicate() { + return this; + } + + @Override + public T createInstance() { + return null; + } + + @Override + public T copy(T from) { + + try { + return InstantiationUtil.clone(from); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException("Could not copy instance of " + from + '.', e); + } + } + + @Override + public T copy(T from, T reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + InstantiationUtil.serializeObject(new DataOutputViewStream(target), record); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + try { + return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader); + } catch (ClassNotFoundException e) { + throw new IOException("Could not deserialize object.", e); + } + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int size = source.readInt(); + target.writeInt(size); + target.write(source, size); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer) obj).userClassLoader); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof JavaSerializer; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java index 4e980b7..83e6369 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.state; +import org.apache.flink.api.common.state.OperatorStateStore; + import java.io.Closeable; /** http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java deleted file mode 100644 index ceab87f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.java.typeutils.runtime.JavaSerializer; - -import java.io.Serializable; -import java.util.Set; - -/** - * Interface for a backend that manages partitionable operator state. - */ -public interface OperatorStateStore { - - String DEFAULT_OPERATOR_STATE_NAME = ""; - - /** - * Creates a satte descriptor of the given name that uses {@link JavaSerializer}. - * - * @param stateName The name of state to create - * @return A state descriptor that uses {@link JavaSerializer} - * @throws Exception - */ - ListState getDefaultPartitionableState(String stateName) throws Exception; - - /** - * Creates (or restores) the partitionable state in this backend. Each state is registered under a unique name. - * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). - * - * @param stateDescriptor The descriptr for this state, providing a name and serializer - * @param The generic type of the state - * @return A list for all state partitions. - * @throws Exception - */ - ListState getPartitionableState(ListStateDescriptor stateDescriptor) throws Exception; - - /** - * Returns a set with the names of all currently registered states. - * @return set of names for all registered states. - */ - Set getRegisteredStateNames(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index ff1a23d..2db8735 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.java.typeutils.runtime.JavaSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.junit.Test; @@ -61,7 +60,7 @@ public class OperatorStateBackendTest { OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); ListStateDescriptor stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); - ListState listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1); + ListState listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); assertNotNull(listState1); assertEquals(1, operatorStateBackend.getRegisteredStateNames().size()); Iterator it = listState1.get().iterator(); @@ -74,7 +73,7 @@ public class OperatorStateBackendTest { assertEquals(4711, it.next()); assertTrue(!it.hasNext()); - ListState listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2); + ListState listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); assertNotNull(listState2); assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); assertTrue(!it.hasNext()); @@ -88,7 +87,7 @@ public class OperatorStateBackendTest { assertEquals(23, it.next()); assertTrue(!it.hasNext()); - ListState listState1b = operatorStateBackend.getPartitionableState(stateDescriptor1); + ListState listState1b = operatorStateBackend.getOperatorState(stateDescriptor1); assertNotNull(listState1b); listState1b.add(123); it = listState1b.get().iterator(); @@ -115,8 +114,8 @@ public class OperatorStateBackendTest { OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); ListStateDescriptor stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); - ListState listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1); - ListState listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2); + ListState listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); + ListState listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); listState1.add(42); listState1.add(4711); @@ -137,8 +136,8 @@ public class OperatorStateBackendTest { assertEquals(0, operatorStateBackend.getRegisteredStateNames().size()); - listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1); - listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2); + listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); + listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index a30341b..8d63345 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.OperatorStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; @@ -313,7 +313,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti this.stateStore = stateStore; ListState offsets = - stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); + stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); restoreToOffset = new HashMap<>(); @@ -333,7 +333,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti } else { ListState listState = - stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); + stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); listState.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index 8b87004..f0975dc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.state.OperatorStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java index 766a107..d2d7fca 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.OperatorStateStore; import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 45b45f0..373d6ab 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -22,7 +22,7 @@ import org.apache.commons.collections.map.LinkedMap; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.OperatorStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -98,7 +98,7 @@ public class FlinkKafkaConsumerBaseTest { FlinkKafkaConsumerBase consumer = getConsumer(fetcher, new LinkedMap(), false); OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); TestingListState> listState = new TestingListState<>(); - when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); consumer.prepareSnapshot(17L, 17L); @@ -121,10 +121,10 @@ public class FlinkKafkaConsumerBaseTest { FlinkKafkaConsumerBase consumer = getConsumer(null, new LinkedMap(), true); - when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(expectedState); + when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(expectedState); consumer.initializeState(operatorStateStore); - when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState); + when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); consumer.prepareSnapshot(17L, 17L); @@ -153,7 +153,7 @@ public class FlinkKafkaConsumerBaseTest { OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); TestingListState listState = new TestingListState<>(); - when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState); + when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); consumer.initializeState(operatorStateStore); consumer.prepareSnapshot(17L, 17L); @@ -190,7 +190,7 @@ public class FlinkKafkaConsumerBaseTest { TestingListState listState2 = new TestingListState<>(); TestingListState listState3 = new TestingListState<>(); - when(backend.getDefaultPartitionableState(Matchers.any(String.class))). + when(backend.getSerializableListState(Matchers.any(String.class))). thenReturn(listState1, listState1, listState2, listState3); consumer.initializeState(backend); @@ -252,7 +252,7 @@ public class FlinkKafkaConsumerBaseTest { OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); TestingListState> listState = new TestingListState<>(); - when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); // create 500 snapshots for (int i = 100; i < 600; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java index 2227201..777cb91 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.checkpoint; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.runtime.state.OperatorStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; /** * @@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.OperatorStateStore; * repartitionable state that needs to be checkpointed. Methods from this interface are called upon checkpointing and * restoring of state. * - * On #initializeState the implementing class receives the {@link org.apache.flink.runtime.state.OperatorStateStore} + * On #initializeState the implementing class receives the {@link OperatorStateStore} * to store it's state. At least before each snapshot, all state persistent state must be stored in the state store. * * When the backend is received for initialization, the user registers states with the backend via http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 428442d..72f30b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -30,7 +30,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.OperatorStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; @@ -108,7 +108,7 @@ public abstract class AbstractUdfStreamOperator ListCheckpointed listCheckpointedFun = (ListCheckpointed) userFunction; ListState listState = getOperatorStateBackend(). - getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); + getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); List list = new ArrayList<>(); @@ -202,7 +202,7 @@ public abstract class AbstractUdfStreamOperator ((ListCheckpointed) userFunction).snapshotState(checkpointId, timestamp); ListState listState = getOperatorStateBackend(). - getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); + getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); listState.clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java index 50cdc02..d2f7e0d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.tasks.StreamTask; @Deprecated @@ -45,8 +44,8 @@ public interface StreamCheckpointedOperator { * This method restores the operator state (if the operator is stateful) and the key/value state * (if it had been used and was initialized when the snapshot occurred). * - *

This method is called after {@link #setup(StreamTask, StreamConfig, Output)} - * and before {@link #open()}. + *

This method is called after {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} + * and before {@link StreamOperator#open()}. * * @param in The stream from which we have to restore our state. * http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 88c3ba4..9802a16 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -85,7 +85,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; * * The life cycle of the task is set up as follows: *

{@code
- *  -- getPartitionableState() -> restores state of all operators in the chain
+ *  -- getOperatorState() -> restores state of all operators in the chain
  *
  *  -- invoke()
  *        |

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 31ccc28..f6e7dca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -559,7 +559,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		public void open() throws Exception {
 			super.open();
 
-			ListState partitionableState = getOperatorStateBackend().getPartitionableState(TEST_DESCRIPTOR);
+			ListState partitionableState = getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
 
 			if (numberSnapshotCalls == 0) {
 				for (Integer v : partitionableState.get()) {
@@ -582,7 +582,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 				long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
 
 			ListState partitionableState =
-					getOperatorStateBackend().getPartitionableState(TEST_DESCRIPTOR);
+					getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
 			partitionableState.clear();
 
 			partitionableState.add(42);