flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [07/12] flink git commit: [FLINK-6830] [tests] Port topology change migration ITCases for Flink 1.3
Date Wed, 07 Jun 2017 16:30:04 GMT
[FLINK-6830] [tests] Port topology change migration ITCases for Flink 1.3


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b3967ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b3967ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b3967ca

Branch: refs/heads/master
Commit: 7b3967cac38863b959b00fb1489b3902dcb5cd83
Parents: 33c49e7
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Sat Jun 3 21:56:54 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Wed Jun 7 18:28:58 2017 +0200

----------------------------------------------------------------------
 .../AbstractOperatorRestoreTestBase.java        |  14 ++--
 .../state/operator/restore/ExecutionMode.java   |   6 +-
 .../AbstractKeyedOperatorRestoreTestBase.java   |  70 +++++++++++++++++++
 .../restore/keyed/KeyedComplexChainTest.java    |  22 +-----
 .../state/operator/restore/keyed/KeyedJob.java  |   7 +-
 ...AbstractNonKeyedOperatorRestoreTestBase.java |  26 ++++++-
 .../restore/unkeyed/ChainBreakTest.java         |   4 ++
 .../unkeyed/ChainLengthDecreaseTest.java        |   4 ++
 .../unkeyed/ChainLengthIncreaseTest.java        |   4 ++
 .../restore/unkeyed/ChainOrderTest.java         |   4 ++
 .../restore/unkeyed/ChainUnionTest.java         |   4 ++
 .../operator/restore/unkeyed/NonKeyedJob.java   |   6 +-
 .../complexKeyed-flink1.2/_metadata             | Bin 0 -> 134953 bytes
 .../complexKeyed-flink1.3/_metadata             | Bin 0 -> 163526 bytes
 .../operatorstate/complexKeyed/_metadata        | Bin 134953 -> 0 bytes
 .../operatorstate/nonKeyed-flink1.2/_metadata   | Bin 0 -> 3212 bytes
 .../operatorstate/nonKeyed-flink1.3/_metadata   | Bin 0 -> 6248 bytes
 .../resources/operatorstate/nonKeyed/_metadata  | Bin 3212 -> 0 bytes
 18 files changed, 130 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 82e8d94..f087cf4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -65,11 +65,11 @@ import java.net.URL;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Abstract class to verify that it is possible to migrate a 1.2 savepoint to 1.3 and that
the topology can be modified
- * from that point on.
+ * Abstract class to verify that it is possible to migrate a savepoint across upgraded Flink
versions and that the
+ * topology can be modified from that point on.
  * 
  * The verification is done in 2 Steps:
- * Step 1: Migrate the job to 1.3 by submitting the same job used for the 1.2 savepoint,
and create a new savepoint.
+ * Step 1: Migrate the job to the newer version by submitting the same job used for the old
version savepoint, and create a new savepoint.
  * Step 2: Modify the job topology, and restore from the savepoint created in step 1.
  */
 public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
@@ -160,9 +160,9 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger
{
 
 	@Test
 	public void testMigrationAndRestore() throws Throwable {
-		// submit 1.2 job and create a migrated 1.3 savepoint
+		// submit job with old version savepoint and create a migrated savepoint in the new version
 		String savepointPath = migrateJob();
-		// restore from migrated 1.3 savepoint
+		// restore from migrated new version savepoint
 		restoreJob(savepointPath);
 	}
 
@@ -256,14 +256,14 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger
{
 	}
 
 	/**
-	 * Recreates the job used to create the 1.2 savepoint.
+	 * Recreates the job used to create the new version savepoint.
 	 *
 	 * @param env StreamExecutionEnvironment to use
 	 */
 	protected abstract void createMigrationJob(StreamExecutionEnvironment env);
 
 	/**
-	 * Creates a modified version of the job used to create the 1.2 savepoint.
+	 * Creates a modified version of the job used to create the new version savepoint.
 	 *
 	 * @param env StreamExecutionEnvironment to use
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
index f333aca..ae9fb21 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
@@ -20,9 +20,9 @@ package org.apache.flink.test.state.operator.restore;
 /**
  * Enum to control function behavior for the different test stages.
  * 
- * {@link ExecutionMode#GENERATE} should be used when creating the 1.2 savepoint.
- * {@link ExecutionMode#MIGRATE} should be used when migrating the 1.2 savepoint to 1.3.
- * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated 1.3 savepoint.
+ * {@link ExecutionMode#GENERATE} should be used when creating the savepoint.
+ * {@link ExecutionMode#MIGRATE} should be used when migrating the savepoint to a newer version.
+ * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated newer version
savepoint.
  */
 public enum ExecutionMode {
 	GENERATE,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
new file mode 100644
index 0000000..1b66c21
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Base class for all keyed operator restore tests.
+ */
+@RunWith(Parameterized.class)
+public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase
{
+
+	private final String savepointPath;
+
+	@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
+	public static Collection<String> parameters () {
+		return Arrays.asList(
+			"complexKeyed-flink1.2",
+			"complexKeyed-flink1.3");
+	}
+
+	public AbstractKeyedOperatorRestoreTestBase(String savepointPath) {
+		this.savepointPath = savepointPath;
+	}
+
+	@Override
+	public void createMigrationJob(StreamExecutionEnvironment env) {
+		/**
+		 * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+		 */
+		SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env,
ExecutionMode.MIGRATE);
+
+		SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE,
source);
+
+		SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE,
window);
+
+		SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE,
first);
+	}
+
+	@Override
+	protected String getMigrationSavepointName() {
+		return savepointPath;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
index 28cd15a..605722d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -20,23 +20,12 @@ package org.apache.flink.test.state.operator.restore.keyed;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
 
-public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase {
+public class KeyedComplexChainTest extends AbstractKeyedOperatorRestoreTestBase {
 
-	@Override
-	public void createMigrationJob(StreamExecutionEnvironment env) {
-		/**
-		 * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
-		 */
-		SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env,
ExecutionMode.MIGRATE);
-
-		SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE,
source);
-
-		SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE,
window);
-
-		SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE,
first);
+	public KeyedComplexChainTest(String savepointPath) {
+		super(savepointPath);
 	}
 
 	@Override
@@ -53,9 +42,4 @@ public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase
{
 		SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE,
second);
 		first.startNewChain();
 	}
-
-	@Override
-	protected final String getMigrationSavepointName() {
-		return "complexKeyed";
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index 3c28c3b..95d0efc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -44,7 +44,8 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * Savepoint generator to create the job used by the {@link KeyedComplexChainTest}.
+ * Savepoint generator to create the savepoint used by the {@link AbstractKeyedOperatorRestoreTestBase}.
+ * Switch to specific version branches and run this job to create savepoints of different
Flink versions.
  *
  * The job should be cancelled manually through the REST API using the cancel-with-savepoint
operation.
  */
@@ -234,8 +235,4 @@ public class KeyedJob {
 			}
 		}
 	}
-
-
-	private KeyedJob() {
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index 5b51765..22fa7b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -23,6 +23,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
@@ -30,10 +36,24 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
 
 /**
- * All classes extending this class will use the same savepoint and migration job.
+ * Base class for all non-keyed operator restore tests.
  */
+@RunWith(Parameterized.class)
 public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase
{
 
+	private final String savepointPath;
+
+	@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
+	public static Collection<String> parameters () {
+		return Arrays.asList(
+			"nonKeyed-flink1.2",
+			"nonKeyed-flink1.3");
+	}
+
+	public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
+		this.savepointPath = savepointPath;
+	}
+
 	@Override
 	public void createMigrationJob(StreamExecutionEnvironment env) {
 		/**
@@ -53,7 +73,7 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp
 	}
 
 	@Override
-	protected final String getMigrationSavepointName() {
-		return "nonKeyed";
+	protected String getMigrationSavepointName() {
+		return savepointPath;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
index 6838070..8055833 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainBreakTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
index e405e76..3235387 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -32,6 +32,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainLengthDecreaseTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
index b78aa10..a10f99c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainLengthIncreaseTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
index 7c68b4e..0baa233 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainOrderTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
index 3f2fba4..0d21e8a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainUnionTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
index 32067b3..08a4c67 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -37,7 +37,8 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
- * Savepoint generator to create the job used by the {@link AbstractNonKeyedOperatorRestoreTestBase}.
+ * Savepoint generator to create the savepoint used by the {@link AbstractNonKeyedOperatorRestoreTestBase}.
+ * Switch to specific version branches and run this job to create savepoints of different
Flink versions.
  *
  * The job should be cancelled manually through the REST API using the cancel-with-savepoint
operation.
  */
@@ -192,7 +193,4 @@ public class NonKeyedJob {
 			}
 		}
 	}
-
-	private NonKeyedJob() {
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
new file mode 100644
index 0000000..0a1ed10
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata
b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata
new file mode 100644
index 0000000..a4f5a1e
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
deleted file mode 100644
index 0a1ed10..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata and /dev/null
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
new file mode 100644
index 0000000..8fcd1ea
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata
new file mode 100644
index 0000000..46169e0
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7b3967ca/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
deleted file mode 100644
index 8fcd1ea..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata and /dev/null
differ


Mime
View raw message