Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 32650200BC8 for ; Wed, 23 Nov 2016 16:13:55 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 30C5F160AFB; Wed, 23 Nov 2016 15:13:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 04F2A160AFA for ; Wed, 23 Nov 2016 16:13:53 +0100 (CET) Received: (qmail 19366 invoked by uid 500); 23 Nov 2016 15:13:53 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 19357 invoked by uid 99); 23 Nov 2016 15:13:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Nov 2016 15:13:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 15003DFCC4; Wed, 23 Nov 2016 15:13:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Message-Id: <358eb35dfd2b40708f6cbf82a7748060@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5000] Rename Methods in ManagedInitializationContext Date: Wed, 23 Nov 2016 15:13:53 +0000 (UTC) archived-at: Wed, 23 Nov 2016 15:13:55 -0000 Repository: flink Updated Branches: refs/heads/master 3e3a90d89 -> 4656350fc [FLINK-5000] Rename Methods in ManagedInitializationContext This removes "managed" from the OperatorStateStore and KeyedStateStore access methods. There is no "un-managed" state and users might be wondering what "managed" means here. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4656350f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4656350f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4656350f Branch: refs/heads/master Commit: 4656350fc33d42ff96ad6d5e836e62172b4b0de6 Parents: 3e3a90d Author: Aljoscha Krettek Authored: Wed Nov 23 14:58:33 2016 +0100 Committer: Aljoscha Krettek Committed: Wed Nov 23 16:13:29 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/state/ManagedInitializationContext.java | 4 ++-- .../flink/runtime/state/StateInitializationContextImpl.java | 6 +++--- .../streaming/connectors/fs/bucketing/BucketingSink.java | 2 +- .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java | 2 +- .../streaming/connectors/kafka/FlinkKafkaProducerBase.java | 2 +- .../connectors/kafka/FlinkKafkaConsumerBaseTest.java | 8 ++++---- .../api/functions/source/ContinuousFileReaderOperator.java | 2 +- .../streaming/api/operators/AbstractUdfStreamOperator.java | 2 +- .../api/operators/StreamOperatorSnapshotRestoreTest.java | 8 ++------ .../org/apache/flink/test/checkpointing/RescalingITCase.java | 6 +++--- 10 files changed, 19 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java index abc528b..5255c43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java @@ -43,11 +43,11 @@ public interface ManagedInitializationContext { /** * Returns an interface that allows for registering operator state with the backend. */ - OperatorStateStore getManagedOperatorStateStore(); + OperatorStateStore getOperatorStateStore(); /** * Returns an interface that allows for registering keyed state with the backend. */ - KeyedStateStore getManagedKeyedStateStore(); + KeyedStateStore getKeyedStateStore(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index b131d14..c86ff6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -122,12 +122,12 @@ public class StateInitializationContextImpl implements StateInitializationContex } @Override - public OperatorStateStore getManagedOperatorStateStore() { + public OperatorStateStore getOperatorStateStore() { return operatorStateStore; } @Override - public KeyedStateStore getManagedKeyedStateStore() { + public KeyedStateStore getKeyedStateStore() { return keyedStateStore; } @@ -268,4 +268,4 @@ public class StateInitializationContextImpl implements StateInitializationContex throw new UnsupportedOperationException("Read only Iterator"); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 1da56b4..cf2c373 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -350,7 +350,7 @@ public class BucketingSink this.refTruncate = reflectTruncate(fs); } - OperatorStateStore stateStore = context.getManagedOperatorStateStore(); + OperatorStateStore stateStore = context.getOperatorStateStore(); restoredBucketStates = stateStore.getSerializableListState("bucket-states"); int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 5161b35..aef7116 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -314,7 +314,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti @Override public void initializeState(FunctionInitializationContext context) throws Exception { - OperatorStateStore stateStore = context.getManagedOperatorStateStore(); + OperatorStateStore stateStore = context.getOperatorStateStore(); offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); if (context.isRestored()) { http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index 33289f8..d413f1c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -343,7 +343,7 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im @Override public void initializeState(FunctionInitializationContext context) throws Exception { - this.stateStore = context.getManagedOperatorStateStore(); + this.stateStore = context.getOperatorStateStore(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 9b7eabf..b96ba30 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -132,7 +132,7 @@ public class FlinkKafkaConsumerBaseTest { StateInitializationContext initializationContext = mock(StateInitializationContext.class); - when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); when(initializationContext.isRestored()).thenReturn(true); consumer.initializeState(initializationContext); @@ -172,7 +172,7 @@ public class FlinkKafkaConsumerBaseTest { StateInitializationContext initializationContext = mock(StateInitializationContext.class); - when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); when(initializationContext.isRestored()).thenReturn(false); consumer.initializeState(initializationContext); @@ -199,7 +199,7 @@ public class FlinkKafkaConsumerBaseTest { StateInitializationContext initializationContext = mock(StateInitializationContext.class); - when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); // make the context signal that there is no restored state, then validate that when(initializationContext.isRestored()).thenReturn(false); @@ -245,7 +245,7 @@ public class FlinkKafkaConsumerBaseTest { StateInitializationContext initializationContext = mock(StateInitializationContext.class); - when(initializationContext.getManagedOperatorStateStore()).thenReturn(backend); + when(initializationContext.getOperatorStateStore()).thenReturn(backend); when(initializationContext.isRestored()).thenReturn(false, true, true, true); consumer.initializeState(initializationContext); http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 74c58f9..bbe1ea5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -92,7 +92,7 @@ public class ContinuousFileReaderOperator extends AbstractStreamOperator @SuppressWarnings("unchecked") ListCheckpointed listCheckpointedFun = (ListCheckpointed) userFunction; - ListState listState = context.getManagedOperatorStateStore(). + ListState listState = context.getOperatorStateStore(). getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); List list = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index c02a7c3..08fbcbe 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -28,10 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -39,13 +37,11 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.util.FutureUtil; import org.junit.Assert; import org.junit.Test; import java.io.InputStream; import java.util.BitSet; -import java.util.Collections; public class StreamOperatorSnapshotRestoreTest { @@ -173,8 +169,8 @@ public class StreamOperatorSnapshotRestoreTest { Assert.assertEquals(verifyRestore, context.isRestored()); - keyedState = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0)); - opState = context.getManagedOperatorStateStore().getSerializableListState("managed-op-state"); + keyedState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0)); + opState = context.getOperatorStateStore().getSerializableListState("managed-op-state"); if (context.isRestored()) { // check restored raw keyed state http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 09de67f..bc65abf 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -792,8 +792,8 @@ public class RescalingITCase extends TestLogger { @Override public void initializeState(FunctionInitializationContext context) throws Exception { - counter = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0)); - sum = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("sum", Integer.class, 0)); + counter = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0)); + sum = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("sum", Integer.class, 0)); } } @@ -937,7 +937,7 @@ public class RescalingITCase extends TestLogger { @Override public void initializeState(FunctionInitializationContext context) throws Exception { this.counterPartitions = - context.getManagedOperatorStateStore().getSerializableListState("counter_partitions"); + context.getOperatorStateStore().getSerializableListState("counter_partitions"); if (context.isRestored()) { for (int v : counterPartitions.get()) { counter += v;