flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [02/26] flink git commit: [hotfix] Remove outdated class OperatorStateHandles and replace it with OperatorSubtaskState
Date Sun, 25 Feb 2018 16:11:36 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index f85b7fb..6926480 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
@@ -64,7 +65,6 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -145,7 +145,7 @@ public class WindowOperatorTest extends TestLogger {
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
 
 		expectedOutput.clear();
@@ -267,7 +267,7 @@ public class WindowOperatorTest extends TestLogger {
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
 		testHarness.close();
 
@@ -402,7 +402,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple3ResultSortComparator());
 
@@ -480,7 +480,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple3ResultSortComparator());
 
@@ -555,7 +555,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
 
 		testHarness = createTestHarness(operator);
@@ -628,7 +628,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
 
 		testHarness = createTestHarness(operator);
@@ -709,7 +709,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
 
 		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499));
@@ -791,7 +791,7 @@ public class WindowOperatorTest extends TestLogger {
 		expectedOutput.add(new Watermark(3000));
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple3ResultSortComparator());
 
@@ -846,7 +846,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-		OperatorStateHandles snapshot;
+		OperatorSubtaskState snapshot;
 
 		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String,
Long, Long>> testHarness =
 				createTestHarness(operator)) {
@@ -1017,7 +1017,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
 
 		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
 		testHarness.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index ced22c0..d38cb28 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -57,7 +57,6 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -320,7 +319,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable
{
 	 * in the local key-group range and the operator states that would be assigned to the local
 	 * subtask.
 	 */
-	public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception
{
+	public void initializeState(OperatorSubtaskState operatorStateHandles) throws Exception
{
 		if (!setupCalled) {
 			setup();
 		}
@@ -391,12 +390,12 @@ public class AbstractStreamOperatorTestHarness<OUT> implements
AutoCloseable {
 	}
 
 	/**
-	 * Takes the different {@link OperatorStateHandles} created by calling {@link #snapshot(long,
long)}
+	 * Takes the different {@link OperatorSubtaskState} created by calling {@link #snapshot(long,
long)}
 	 * on different instances of {@link AbstractStreamOperatorTestHarness} (each one representing
one subtask)
-	 * and repacks them into a single {@link OperatorStateHandles} so that the parallelism of
the test
+	 * and repacks them into a single {@link OperatorSubtaskState} so that the parallelism of
the test
 	 * can change arbitrarily (i.e. be able to scale both up and down).
 	 *
-	 * <p>After repacking the partial states, use {@link #initializeState(OperatorStateHandles)}
to initialize
+	 * <p>After repacking the partial states, use {@link #initializeState(OperatorSubtaskState)}
to initialize
 	 * a new instance with the resulting state. Bear in mind that for parallelism greater than
one, you
 	 * have to use the constructor {@link #AbstractStreamOperatorTestHarness(StreamOperator,
int, int, int)}.
 	 *
@@ -409,7 +408,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable
{
 	 * @param handles the different states to be merged.
 	 * @return the resulting state, or {@code null} if no partial states are specified.
 	 */
-	public static OperatorStateHandles repackageState(OperatorStateHandles... handles) throws
Exception {
+	public static OperatorSubtaskState repackageState(OperatorSubtaskState... handles) throws
Exception {
 
 		if (handles.length < 1) {
 			return null;
@@ -423,7 +422,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable
{
 		List<KeyedStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length);
 		List<KeyedStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length);
 
-		for (OperatorStateHandles handle: handles) {
+		for (OperatorSubtaskState handle: handles) {
 
 			Collection<OperatorStateHandle> managedOperatorState = handle.getManagedOperatorState();
 			Collection<OperatorStateHandle> rawOperatorState = handle.getRawOperatorState();
@@ -447,12 +446,11 @@ public class AbstractStreamOperatorTestHarness<OUT> implements
AutoCloseable {
 			}
 		}
 
-		return new OperatorStateHandles(
-			0,
-			mergedManagedKeyedState,
-			mergedRawKeyedState,
+		return new OperatorSubtaskState(
 			mergedManagedOperatorState,
-			mergedRawOperatorState);
+			mergedRawOperatorState,
+			mergedManagedKeyedState,
+			mergedRawKeyedState);
 	}
 
 	/**
@@ -470,7 +468,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable
{
 	/**
 	 * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions, org.apache.flink.runtime.state.CheckpointStreamFactory)}.
 	 */
-	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception
{
+	public OperatorSubtaskState snapshot(long checkpointId, long timestamp) throws Exception
{
 
 		OperatorSnapshotResult operatorStateResult = operator.snapshotState(
 			checkpointId,
@@ -484,12 +482,11 @@ public class AbstractStreamOperatorTestHarness<OUT> implements
AutoCloseable {
 		OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
 		OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
-		return new OperatorStateHandles(
-			0,
-			keyedManaged != null ? Collections.singletonList(keyedManaged) : null,
-			keyedRaw != null ? Collections.singletonList(keyedRaw) : null,
-			opManaged != null ? Collections.singletonList(opManaged) : null,
-			opRaw != null ? Collections.singletonList(opRaw) : null);
+		return new OperatorSubtaskState(
+			opManaged != null ? Collections.singletonList(opManaged) : Collections.emptyList(),
+			opRaw != null ? Collections.singletonList(opRaw) : Collections.emptyList(),
+			keyedManaged != null ? Collections.singletonList(keyedManaged) : Collections.emptyList(),
+			keyedRaw != null ? Collections.singletonList(keyedRaw) : Collections.emptyList());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 33f32e9..8d37266 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -34,7 +34,7 @@ import java.util.Collection;
 import java.util.List;
 
 /**
- * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
+ * Util for writing/reading {@link OperatorSubtaskState},
  * for use in tests.
  */
 public class OperatorSnapshotUtil {
@@ -45,12 +45,13 @@ public class OperatorSnapshotUtil {
 		return resource.getFile();
 	}
 
-	public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException
{
+	public static void writeStateHandle(OperatorSubtaskState state, String path) throws IOException
{
 		FileOutputStream out = new FileOutputStream(path);
 
 		try (DataOutputStream dos = new DataOutputStream(out)) {
 
-			dos.writeInt(state.getOperatorChainIndex());
+			// must be here for compatibility
+			dos.writeInt(0);
 
 			// still required for compatibility
 			SavepointV1Serializer.serializeStreamStateHandle(null, dos);
@@ -103,18 +104,19 @@ public class OperatorSnapshotUtil {
 		}
 	}
 
-	public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException
{
+	public static OperatorSubtaskState readStateHandle(String path) throws IOException, ClassNotFoundException
{
 		FileInputStream in = new FileInputStream(path);
 		try (DataInputStream dis = new DataInputStream(in)) {
-			int index = dis.readInt();
+
+			// ignored
+			dis.readInt();
 
 			// still required for compatibility to consume the bytes.
 			SavepointV1Serializer.deserializeStreamStateHandle(dis);
 
-			List<OperatorStateHandle> rawOperatorState = null;
+			List<OperatorStateHandle> rawOperatorState = new ArrayList<>();
 			int numRawOperatorStates = dis.readInt();
 			if (numRawOperatorStates >= 0) {
-				rawOperatorState = new ArrayList<>();
 				for (int i = 0; i < numRawOperatorStates; i++) {
 					OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
 						dis);
@@ -122,10 +124,9 @@ public class OperatorSnapshotUtil {
 				}
 			}
 
-			List<OperatorStateHandle> managedOperatorState = null;
+			List<OperatorStateHandle> managedOperatorState = new ArrayList<>();
 			int numManagedOperatorStates = dis.readInt();
 			if (numManagedOperatorStates >= 0) {
-				managedOperatorState = new ArrayList<>();
 				for (int i = 0; i < numManagedOperatorStates; i++) {
 					OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
 						dis);
@@ -133,10 +134,9 @@ public class OperatorSnapshotUtil {
 				}
 			}
 
-			List<KeyedStateHandle> rawKeyedState = null;
+			List<KeyedStateHandle> rawKeyedState = new ArrayList<>();
 			int numRawKeyedStates = dis.readInt();
 			if (numRawKeyedStates >= 0) {
-				rawKeyedState = new ArrayList<>();
 				for (int i = 0; i < numRawKeyedStates; i++) {
 					KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
 						dis);
@@ -144,10 +144,9 @@ public class OperatorSnapshotUtil {
 				}
 			}
 
-			List<KeyedStateHandle> managedKeyedState = null;
+			List<KeyedStateHandle> managedKeyedState = new ArrayList<>();
 			int numManagedKeyedStates = dis.readInt();
 			if (numManagedKeyedStates >= 0) {
-				managedKeyedState = new ArrayList<>();
 				for (int i = 0; i < numManagedKeyedStates; i++) {
 					KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
 						dis);
@@ -155,12 +154,11 @@ public class OperatorSnapshotUtil {
 				}
 			}
 
-			return new OperatorStateHandles(
-				index,
-				managedKeyedState,
-				rawKeyedState,
+			return new OperatorSubtaskState(
 				managedOperatorState,
-				rawOperatorState);
+				rawOperatorState,
+				managedKeyedState,
+				rawKeyedState);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index 908666a..aed6663 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -45,7 +46,6 @@ import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.DynamicCodeLoadingException;
@@ -307,7 +307,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			new URL[]{rootPath.toURI().toURL()},
 			Thread.currentThread().getContextClassLoader());
 
-		OperatorStateHandles stateHandles = runOperator(
+		OperatorSubtaskState stateHandles = runOperator(
 			taskConfiguration,
 			executionConfig,
 			new StreamMap<>(new StatefulMapper(isKeyedState, false, hasBField)),
@@ -340,7 +340,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			inputs);
 	}
 
-	private OperatorStateHandles runOperator(
+	private OperatorSubtaskState runOperator(
 			Configuration taskConfiguration,
 			ExecutionConfig executionConfig,
 			OneInputStreamOperator<Long, Long> operator,
@@ -348,7 +348,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			boolean isKeyedState,
 			StateBackend stateBackend,
 			ClassLoader classLoader,
-			OperatorStateHandles operatorStateHandles,
+			OperatorSubtaskState operatorStateHandles,
 			Iterable<Long> input) throws Exception {
 
 		try (final MockEnvironment environment = new MockEnvironment(


Mime
View raw message