From commits-return-16005-archive-asf-public=cust-asf.ponee.io@flink.apache.org Sun Feb 25 17:11:41 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D51E5180789 for ; Sun, 25 Feb 2018 17:11:40 +0100 (CET) Received: (qmail 37995 invoked by uid 500); 25 Feb 2018 16:11:39 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 37734 invoked by uid 99); 25 Feb 2018 16:11:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 25 Feb 2018 16:11:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2E730F4DDF; Sun, 25 Feb 2018 16:11:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Sun, 25 Feb 2018 16:11:36 -0000 Message-Id: In-Reply-To: <7116d0836a0f44189adee9731ccd69ec@git.apache.org> References: <7116d0836a0f44189adee9731ccd69ec@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/26] flink git commit: [hotfix] Remove outdated class OperatorStateHandles and replace it with OperatorSubtaskState http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index f85b7fb..6926480 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; @@ -64,7 +65,6 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -145,7 +145,7 @@ public class WindowOperatorTest extends TestLogger { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); testHarness.close(); expectedOutput.clear(); @@ -267,7 +267,7 @@ public class WindowOperatorTest extends TestLogger { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); testHarness.close(); @@ -402,7 +402,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); @@ -480,7 +480,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); @@ -555,7 +555,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); testHarness.close(); testHarness = createTestHarness(operator); @@ -628,7 +628,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); testHarness.close(); testHarness = createTestHarness(operator); @@ -709,7 +709,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); testHarness.close(); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499)); @@ -791,7 +791,7 @@ public class WindowOperatorTest extends TestLogger { expectedOutput.add(new Watermark(3000)); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); @@ -846,7 +846,7 @@ public class WindowOperatorTest extends TestLogger { null /* late data output tag */); ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - OperatorStateHandles snapshot; + OperatorSubtaskState snapshot; try (OneInputStreamOperatorTestHarness, Tuple3> testHarness = createTestHarness(operator)) { @@ -1017,7 +1017,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); testHarness.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index ced22c0..d38cb28 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -57,7 +57,6 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -320,7 +319,7 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { * in the local key-group range and the operator states that would be assigned to the local * subtask. */ - public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { + public void initializeState(OperatorSubtaskState operatorStateHandles) throws Exception { if (!setupCalled) { setup(); } @@ -391,12 +390,12 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { } /** - * Takes the different {@link OperatorStateHandles} created by calling {@link #snapshot(long, long)} + * Takes the different {@link OperatorSubtaskState} created by calling {@link #snapshot(long, long)} * on different instances of {@link AbstractStreamOperatorTestHarness} (each one representing one subtask) - * and repacks them into a single {@link OperatorStateHandles} so that the parallelism of the test + * and repacks them into a single {@link OperatorSubtaskState} so that the parallelism of the test * can change arbitrarily (i.e. be able to scale both up and down). * - *

After repacking the partial states, use {@link #initializeState(OperatorStateHandles)} to initialize + *

After repacking the partial states, use {@link #initializeState(OperatorSubtaskState)} to initialize * a new instance with the resulting state. Bear in mind that for parallelism greater than one, you * have to use the constructor {@link #AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}. * @@ -409,7 +408,7 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { * @param handles the different states to be merged. * @return the resulting state, or {@code null} if no partial states are specified. */ - public static OperatorStateHandles repackageState(OperatorStateHandles... handles) throws Exception { + public static OperatorSubtaskState repackageState(OperatorSubtaskState... handles) throws Exception { if (handles.length < 1) { return null; @@ -423,7 +422,7 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { List mergedManagedKeyedState = new ArrayList<>(handles.length); List mergedRawKeyedState = new ArrayList<>(handles.length); - for (OperatorStateHandles handle: handles) { + for (OperatorSubtaskState handle: handles) { Collection managedOperatorState = handle.getManagedOperatorState(); Collection rawOperatorState = handle.getRawOperatorState(); @@ -447,12 +446,11 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { } } - return new OperatorStateHandles( - 0, - mergedManagedKeyedState, - mergedRawKeyedState, + return new OperatorSubtaskState( mergedManagedOperatorState, - mergedRawOperatorState); + mergedRawOperatorState, + mergedManagedKeyedState, + mergedRawKeyedState); } /** @@ -470,7 +468,7 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { /** * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions, org.apache.flink.runtime.state.CheckpointStreamFactory)}. */ - public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception { + public OperatorSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { OperatorSnapshotResult operatorStateResult = operator.snapshotState( checkpointId, @@ -484,12 +482,11 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture()); OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture()); - return new OperatorStateHandles( - 0, - keyedManaged != null ? Collections.singletonList(keyedManaged) : null, - keyedRaw != null ? Collections.singletonList(keyedRaw) : null, - opManaged != null ? Collections.singletonList(opManaged) : null, - opRaw != null ? Collections.singletonList(opRaw) : null); + return new OperatorSubtaskState( + opManaged != null ? Collections.singletonList(opManaged) : Collections.emptyList(), + opRaw != null ? Collections.singletonList(opRaw) : Collections.emptyList(), + keyedManaged != null ? Collections.singletonList(keyedManaged) : Collections.emptyList(), + keyedRaw != null ? Collections.singletonList(keyedRaw) : Collections.emptyList()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java index 33f32e9..8d37266 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.util; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -34,7 +34,7 @@ import java.util.Collection; import java.util.List; /** - * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles}, + * Util for writing/reading {@link OperatorSubtaskState}, * for use in tests. */ public class OperatorSnapshotUtil { @@ -45,12 +45,13 @@ public class OperatorSnapshotUtil { return resource.getFile(); } - public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException { + public static void writeStateHandle(OperatorSubtaskState state, String path) throws IOException { FileOutputStream out = new FileOutputStream(path); try (DataOutputStream dos = new DataOutputStream(out)) { - dos.writeInt(state.getOperatorChainIndex()); + // must be here for compatibility + dos.writeInt(0); // still required for compatibility SavepointV1Serializer.serializeStreamStateHandle(null, dos); @@ -103,18 +104,19 @@ public class OperatorSnapshotUtil { } } - public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException { + public static OperatorSubtaskState readStateHandle(String path) throws IOException, ClassNotFoundException { FileInputStream in = new FileInputStream(path); try (DataInputStream dis = new DataInputStream(in)) { - int index = dis.readInt(); + + // ignored + dis.readInt(); // still required for compatibility to consume the bytes. SavepointV1Serializer.deserializeStreamStateHandle(dis); - List rawOperatorState = null; + List rawOperatorState = new ArrayList<>(); int numRawOperatorStates = dis.readInt(); if (numRawOperatorStates >= 0) { - rawOperatorState = new ArrayList<>(); for (int i = 0; i < numRawOperatorStates; i++) { OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( dis); @@ -122,10 +124,9 @@ public class OperatorSnapshotUtil { } } - List managedOperatorState = null; + List managedOperatorState = new ArrayList<>(); int numManagedOperatorStates = dis.readInt(); if (numManagedOperatorStates >= 0) { - managedOperatorState = new ArrayList<>(); for (int i = 0; i < numManagedOperatorStates; i++) { OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( dis); @@ -133,10 +134,9 @@ public class OperatorSnapshotUtil { } } - List rawKeyedState = null; + List rawKeyedState = new ArrayList<>(); int numRawKeyedStates = dis.readInt(); if (numRawKeyedStates >= 0) { - rawKeyedState = new ArrayList<>(); for (int i = 0; i < numRawKeyedStates; i++) { KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( dis); @@ -144,10 +144,9 @@ public class OperatorSnapshotUtil { } } - List managedKeyedState = null; + List managedKeyedState = new ArrayList<>(); int numManagedKeyedStates = dis.readInt(); if (numManagedKeyedStates >= 0) { - managedKeyedState = new ArrayList<>(); for (int i = 0; i < numManagedKeyedStates; i++) { KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( dis); @@ -155,12 +154,11 @@ public class OperatorSnapshotUtil { } } - return new OperatorStateHandles( - index, - managedKeyedState, - rawKeyedState, + return new OperatorSubtaskState( managedOperatorState, - rawOperatorState); + rawOperatorState, + managedKeyedState, + rawKeyedState); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java index 908666a..aed6663 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java @@ -35,6 +35,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -45,7 +46,6 @@ import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamMap; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.DynamicCodeLoadingException; @@ -307,7 +307,7 @@ public class PojoSerializerUpgradeTest extends TestLogger { new URL[]{rootPath.toURI().toURL()}, Thread.currentThread().getContextClassLoader()); - OperatorStateHandles stateHandles = runOperator( + OperatorSubtaskState stateHandles = runOperator( taskConfiguration, executionConfig, new StreamMap<>(new StatefulMapper(isKeyedState, false, hasBField)), @@ -340,7 +340,7 @@ public class PojoSerializerUpgradeTest extends TestLogger { inputs); } - private OperatorStateHandles runOperator( + private OperatorSubtaskState runOperator( Configuration taskConfiguration, ExecutionConfig executionConfig, OneInputStreamOperator operator, @@ -348,7 +348,7 @@ public class PojoSerializerUpgradeTest extends TestLogger { boolean isKeyedState, StateBackend stateBackend, ClassLoader classLoader, - OperatorStateHandles operatorStateHandles, + OperatorSubtaskState operatorStateHandles, Iterable input) throws Exception { try (final MockEnvironment environment = new MockEnvironment(