flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [06/15] flink git commit: [FLINK-6883] [tests] Add migration tests for Scala jobs
Date Tue, 13 Jun 2017 05:17:25 GMT
[FLINK-6883] [tests] Add migration tests for Scala jobs

This commit adds migration ITCases for jobs written using the Scala API.
An extra concern for migration of Scala jobs is that Scala case classes
and collections use anonymous generated serializers, which may affect
state restore.

This closes #4103.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7aad0ecd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7aad0ecd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7aad0ecd

Branch: refs/heads/master
Commit: 7aad0ecd80f3c21fd21991e4c29d42a7802d95b9
Parents: 69fada3d
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Sun Jun 11 15:31:42 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Jun 13 06:38:01 2017 +0200

----------------------------------------------------------------------
 .../api/scala/typeutils/OptionTypeInfo.scala    |   5 +-
 ...gration-itcase-flink1.2-jobmanager-savepoint | Bin 0 -> 92818 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 92818 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 ...gration-itcase-flink1.2-jobmanager-savepoint | Bin 0 -> 92818 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 92818 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../scala/migration/MigrationTestTypes.scala    |  28 ++
 .../ScalaSerializersMigrationTest.scala         | 110 +++++++
 .../StatefulJobSavepointMigrationITCase.scala   | 303 +++++++++++++++++++
 12 files changed, 445 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 73fe580..d2e66a5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Public, PublicEvolving}
+import org.apache.flink.annotation.{Public, PublicEvolving, VisibleForTesting}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
@@ -85,4 +85,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformatio
   override def hashCode: Int = {
     elemTypeInfo.hashCode()
   }
+
+  @VisibleForTesting
+  def getElemTypeInfo: TypeInformation[A] = elemTypeInfo
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
new file mode 100644
index 0000000..3d0f8c5
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
new file mode 100644
index 0000000..5a763df
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
new file mode 100644
index 0000000..e183e51
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..612bc1b
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
new file mode 100644
index 0000000..9b90ac8
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
new file mode 100644
index 0000000..99777a1
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
new file mode 100644
index 0000000..6adf433
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..d9eaa72
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
new file mode 100644
index 0000000..4ae57c4
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+case class CustomCaseClass(a: String, b: Long)
+
+case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass)
+
+object CustomEnum extends Enumeration {
+  type CustomEnum = Value
+  val ONE, TWO, THREE, FOUR = Value
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
new file mode 100644
index 0000000..a2edc90
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils._
+import org.junit.{Assert, Test}
+
+import scala.util.Try
+
+class ScalaSerializersMigrationTest {
+
+  /**
+   * Verifies that the generated classnames for anonymous Scala serializers remain the same.
+   *
+   * The classnames in this test are collected from running the same type information generation
+   * code in previous version branches. They should not change across different Flink versions.
+   */
+  @Test
+  def testStableAnonymousClassnameGeneration(): Unit = {
+    val caseClassInfo = createTypeInformation[CustomCaseClass]
+    val caseClassWithNestingInfo =
+      createTypeInformation[CustomCaseClassWithNesting]
+        .asInstanceOf[CaseClassTypeInfo[_]]
+    val traversableInfo =
+      createTypeInformation[List[CustomCaseClass]]
+        .asInstanceOf[TraversableTypeInfo[_,_]]
+    val tryInfo =
+      createTypeInformation[Try[CustomCaseClass]]
+        .asInstanceOf[TryTypeInfo[_,_]]
+    val optionInfo =
+      createTypeInformation[Option[CustomCaseClass]]
+        .asInstanceOf[OptionTypeInfo[_,_]]
+    val eitherInfo =
+      createTypeInformation[Either[CustomCaseClass, String]]
+        .asInstanceOf[EitherTypeInfo[_,_,_]]
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$8",
+      caseClassInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$8$$anon$1",
+      caseClassInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9",
+      caseClassWithNestingInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$3",
+      caseClassWithNestingInfo.createSerializer(new ExecutionConfig).getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$10",
+      caseClassWithNestingInfo.getTypeAt("nested").getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$10$$anon$2",
+      caseClassWithNestingInfo.getTypeAt("nested")
+        .createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$16",
+      traversableInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$16$$anon$12",
+      traversableInfo.createSerializer(new ExecutionConfig).getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$11",
+      traversableInfo.elementTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$11$$anon$4",
+      traversableInfo.elementTypeInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$13",
+      tryInfo.elemTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$13$$anon$5",
+      tryInfo.elemTypeInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$14",
+      optionInfo.getElemTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$14$$anon$6",
+      optionInfo.getElemTypeInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$15",
+      eitherInfo.leftTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$15$$anon$7",
+      eitherInfo.leftTypeInfo.createSerializer(new ExecutionConfig).getClass.getName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
new file mode 100644
index 0000000..1e67042
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+import java.util
+
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.memory.MemoryStateBackend
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.runtime.state.{AbstractStateBackend, FunctionInitializationContext,
FunctionSnapshotContext}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.streaming.util.migration.MigrationVersion
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Assume, Ignore, Test}
+
+import scala.util.{Failure, Properties, Try}
+
+object StatefulJobSavepointMigrationITCase {
+
+  @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+  def parameters: util.Collection[(MigrationVersion, String)] = {
+    util.Arrays.asList(
+      (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME))
+  }
+
+  // TODO to generate savepoints for a specific Flink version / backend type,
+  // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
+  // TODO set as (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)
+  val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_3
+  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME
+
+  val SCALA_VERSION: String = {
+    val versionString = Properties.versionString.split(" ")(1)
+    versionString.substring(0, versionString.lastIndexOf("."))
+  }
+
+  val NUM_ELEMENTS = 4
+}
+
+/**
+ * ITCase for migration Scala state types across different Flink versions.
+ */
+@RunWith(classOf[Parameterized])
+class StatefulJobSavepointMigrationITCase(
+    migrationVersionAndBackend: (MigrationVersion, String))
+  extends SavepointMigrationTestBase with Serializable {
+
+  @Ignore
+  @Test
+  def testCreateSavepoint(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
+      case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+        env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
+      case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+        env.setStateBackend(new MemoryStateBackend())
+      case _ => throw new UnsupportedOperationException
+    }
+
+    env.setStateBackend(new MemoryStateBackend)
+    env.enableCheckpointing(500)
+    env.setParallelism(4)
+    env.setMaxParallelism(4)
+
+    env
+      .addSource(
+        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
+      .keyBy(
+        new KeySelector[(Long, Long), Long] {
+          override def getKey(value: (Long, Long)): Long = value._1
+        }
+      )
+      .flatMap(new StatefulFlatMapper)
+      .addSink(new AccumulatorCountingSink)
+
+    executeAndSavepoint(
+      env,
+      s"src/test/resources/stateful-scala" +
+        s"${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" +
+        s"-udf-migration-itcase-flink" +
+        s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" +
+        s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
+      new Tuple2(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+      )
+    )
+  }
+
+  @Test
+  def testRestoreSavepoint(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    migrationVersionAndBackend._2 match {
+      case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+        env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
+      case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+        env.setStateBackend(new MemoryStateBackend())
+      case _ => throw new UnsupportedOperationException
+    }
+
+    env.setStateBackend(new MemoryStateBackend)
+    env.enableCheckpointing(500)
+    env.setParallelism(4)
+    env.setMaxParallelism(4)
+
+    env
+      .addSource(
+        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
+      .keyBy(
+        new KeySelector[(Long, Long), Long] {
+          override def getKey(value: (Long, Long)): Long = value._1
+        }
+      )
+      .flatMap(new StatefulFlatMapper)
+      .addSink(new AccumulatorCountingSink)
+
+    restoreAndExecute(
+      env,
+      SavepointMigrationTestBase.getResourceFilename(
+        s"stateful-scala${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" +
+          s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
+          s"-${migrationVersionAndBackend._2}-savepoint"),
+      new Tuple2(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
+    )
+  }
+
+  @SerialVersionUID(1L)
+  private object CheckpointedSource {
+    var CHECKPOINTED_STRING = "Here be dragons!"
+  }
+
+  @SerialVersionUID(1L)
+  private class CheckpointedSource(val numElements: Int)
+      extends SourceFunction[(Long, Long)] with CheckpointedFunction {
+
+    private var isRunning = true
+    private var state: ListState[CustomCaseClass] = _
+
+    @throws[Exception]
+    override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
+      ctx.emitWatermark(new Watermark(0))
+      ctx.getCheckpointLock synchronized {
+        var i = 0
+        while (i < numElements) {
+            ctx.collect(i, i)
+            i += 1
+        }
+      }
+      // don't emit a final watermark so that we don't trigger the registered event-time
+      // timers
+      while (isRunning) Thread.sleep(20)
+    }
+
+    def cancel() {
+      isRunning = false
+    }
+
+    override def initializeState(context: FunctionInitializationContext): Unit = {
+      state = context.getOperatorStateStore.getOperatorState(
+        new ListStateDescriptor[CustomCaseClass](
+          "sourceState", createTypeInformation[CustomCaseClass]))
+    }
+
+    override def snapshotState(context: FunctionSnapshotContext): Unit = {
+      state.clear()
+      state.add(CustomCaseClass("Here be dragons!", 123))
+    }
+  }
+
+  @SerialVersionUID(1L)
+  private object AccumulatorCountingSink {
+    var NUM_ELEMENTS_ACCUMULATOR = classOf[AccumulatorCountingSink[_]] + "_NUM_ELEMENTS"
+  }
+
+  @SerialVersionUID(1L)
+  private class AccumulatorCountingSink[T] extends RichSinkFunction[T] {
+
+    private var count: Int = 0
+
+    @throws[Exception]
+    override def open(parameters: Configuration) {
+      super.open(parameters)
+      getRuntimeContext.addAccumulator(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, new IntCounter)
+    }
+
+    @throws[Exception]
+    def invoke(value: T) {
+      count += 1
+      getRuntimeContext.getAccumulator(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR).add(1)
+    }
+  }
+
+  class StatefulFlatMapper extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
+
+    private var caseClassState: ValueState[CustomCaseClass] = _
+    private var caseClassWithNestingState: ValueState[CustomCaseClassWithNesting] = _
+    private var collectionState: ValueState[List[CustomCaseClass]] = _
+    private var tryState: ValueState[Try[CustomCaseClass]] = _
+    private var tryFailureState: ValueState[Try[CustomCaseClass]] = _
+    private var optionState: ValueState[Option[CustomCaseClass]] = _
+    private var optionNoneState: ValueState[Option[CustomCaseClass]] = _
+    private var eitherLeftState: ValueState[Either[CustomCaseClass, String]] = _
+    private var eitherRightState: ValueState[Either[CustomCaseClass, String]] = _
+    private var enumOneState: ValueState[CustomEnum] = _
+    private var enumThreeState: ValueState[CustomEnum] = _
+
+    override def open(parameters: Configuration): Unit = {
+      caseClassState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomCaseClass](
+          "caseClassState", createTypeInformation[CustomCaseClass]))
+      caseClassWithNestingState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomCaseClassWithNesting](
+          "caseClassWithNestingState", createTypeInformation[CustomCaseClassWithNesting]))
+      collectionState = getRuntimeContext.getState(
+        new ValueStateDescriptor[List[CustomCaseClass]](
+          "collectionState", createTypeInformation[List[CustomCaseClass]]))
+      tryState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Try[CustomCaseClass]](
+          "tryState", createTypeInformation[Try[CustomCaseClass]]))
+      tryFailureState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Try[CustomCaseClass]](
+          "tryFailureState", createTypeInformation[Try[CustomCaseClass]]))
+      optionState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Option[CustomCaseClass]](
+          "optionState", createTypeInformation[Option[CustomCaseClass]]))
+      optionNoneState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Option[CustomCaseClass]](
+          "optionNoneState", createTypeInformation[Option[CustomCaseClass]]))
+      eitherLeftState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Either[CustomCaseClass, String]](
+          "eitherLeftState", createTypeInformation[Either[CustomCaseClass, String]]))
+      eitherRightState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Either[CustomCaseClass, String]](
+          "eitherRightState", createTypeInformation[Either[CustomCaseClass, String]]))
+      enumOneState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomEnum](
+          "enumOneState", createTypeInformation[CustomEnum]))
+      enumThreeState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomEnum](
+          "enumThreeState", createTypeInformation[CustomEnum]))
+    }
+
+    override def flatMap(in: (Long, Long), collector: Collector[(Long, Long)]): Unit = {
+      caseClassState.update(CustomCaseClass(in._1.toString, in._2 * 2))
+      caseClassWithNestingState.update(
+        CustomCaseClassWithNesting(in._1, CustomCaseClass(in._1.toString, in._2 * 2)))
+      collectionState.update(List(CustomCaseClass(in._1.toString, in._2 * 2)))
+      tryState.update(Try(CustomCaseClass(in._1.toString, in._2 * 5)))
+      tryFailureState.update(Failure(new RuntimeException))
+      optionState.update(Some(CustomCaseClass(in._1.toString, in._2 * 2)))
+      optionNoneState.update(None)
+      eitherLeftState.update(Left(CustomCaseClass(in._1.toString, in._2 * 2)))
+      eitherRightState.update(Right((in._1 * 3).toString))
+      enumOneState.update(CustomEnum.ONE)
+      enumOneState.update(CustomEnum.THREE)
+
+      collector.collect(in)
+    }
+  }
+
+}


Mime
View raw message