flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [01/15] flink git commit: [hotfix] [scala] Fix instantiation of Scala serializers' config snapshot classes
Date Tue, 13 Jun 2017 05:48:58 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 261bb3826 -> db975260c


[hotfix] [scala] Fix instantiation of Scala serializers' config snapshot classes

Prior to this commit, the configuration snapshot classes of Scala
serializers did not have the proper default empty constructor that is
used for deserializing the configuration snapshot.

Since some Scala serializers' config snapshots extend the Java
CompositeTypeSerializerConfigSnapshot, their config snapshot classes are
also changed to be implemented in Java since in Scala we can only call a
single base class constructor from subclasses.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9cb728e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9cb728e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9cb728e2

Branch: refs/heads/release-1.3
Commit: 9cb728e2021132528f4f40e891eb46656de8bd60
Parents: 261bb38
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Thu Jun 8 08:52:04 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Jun 13 07:20:12 2017 +0200

----------------------------------------------------------------------
 .../ScalaOptionSerializerConfigSnapshot.java    | 47 +++++++++++++
 .../ScalaTrySerializerConfigSnapshot.java       | 50 ++++++++++++++
 .../TraversableSerializerConfigSnapshot.java    | 47 +++++++++++++
 .../scala/typeutils/EnumValueSerializer.scala   | 14 ++--
 .../api/scala/typeutils/OptionSerializer.scala  | 62 ++++++++++-------
 .../scala/typeutils/TraversableSerializer.scala | 25 ++-----
 .../api/scala/typeutils/TrySerializer.scala     | 72 +++++++++++---------
 7 files changed, 233 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9cb728e2/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
new file mode 100644
index 0000000..03eef12
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link OptionSerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot
{
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ScalaOptionSerializerConfigSnapshot() {}
+
+	public ScalaOptionSerializerConfigSnapshot(TypeSerializer<E> elementSerializer) {
+		super(elementSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cb728e2/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
new file mode 100644
index 0000000..6abb3ea
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link TrySerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public class ScalaTrySerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot
{
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ScalaTrySerializerConfigSnapshot() {}
+
+	public ScalaTrySerializerConfigSnapshot(
+			TypeSerializer<E> elementSerializer,
+			TypeSerializer<Throwable> throwableSerializer) {
+
+		super(elementSerializer, throwableSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cb728e2/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
new file mode 100644
index 0000000..9a39421
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link TraversableSerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public class TraversableSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot
{
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public TraversableSerializerConfigSnapshot() {}
+
+	public TraversableSerializerConfigSnapshot(TypeSerializer<E> elementSerializer) {
+		super(elementSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cb728e2/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 843079a..d549623 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerialize
 import org.apache.flink.api.common.typeutils.base.IntSerializer
 import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream}
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.{InstantiationUtil, Preconditions}
 
 /**
  * Serializer for [[Enumeration]] values.
@@ -111,13 +111,17 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends
TypeSerializer[
 
 object EnumValueSerializer {
 
-  class ScalaEnumSerializerConfigSnapshot[E <: Enumeration](private var enumClass: Class[E])
+  class ScalaEnumSerializerConfigSnapshot[E <: Enumeration]
       extends TypeSerializerConfigSnapshot {
 
-    var enumConstants: Array[E] = enumClass.getEnumConstants
+    var enumClass: Class[E] = _
+    var enumConstants: Array[E] = _
 
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
+    def this(enumClass: Class[E]) = {
+      this()
+      this.enumClass = Preconditions.checkNotNull(enumClass)
+      this.enumConstants = enumClass.getEnumConstants
+    }
 
     override def write(out: DataOutputView): Unit = {
       super.write(out)

http://git-wip-us.apache.org/repos/asf/flink/blob/9cb728e2/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 8adfb5c..810c91c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -100,46 +100,56 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): OptionSerializer.OptionSerializerConfigSnapshot[A]
= {
-    new OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer)
+  override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = {
+    new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Option[A]] = {
+
     configSnapshot match {
       case optionSerializerConfigSnapshot
+          : ScalaOptionSerializerConfigSnapshot[A] =>
+        ensureCompatibility(optionSerializerConfigSnapshot)
+      case legacyOptionSerializerConfigSnapshot
           : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
-        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
-          optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
-          elemSerializer)
-
-        if (compatResult.isRequiresMigration) {
-          if (compatResult.getConvertDeserializer != null) {
-            CompatibilityResult.requiresMigration(
-              new OptionSerializer[A](
-                new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
-          } else {
-            CompatibilityResult.requiresMigration()
-          }
-        } else {
-          CompatibilityResult.compatible()
-        }
-
+        ensureCompatibility(legacyOptionSerializerConfigSnapshot)
       case _ => CompatibilityResult.requiresMigration()
     }
   }
+
+  private def ensureCompatibility(
+      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
+      : CompatibilityResult[Option[A]] = {
+
+    val compatResult = CompatibilityUtil.resolveCompatibilityResult(
+      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
+      elemSerializer)
+
+    if (compatResult.isRequiresMigration) {
+      if (compatResult.getConvertDeserializer != null) {
+        CompatibilityResult.requiresMigration(
+          new OptionSerializer[A](
+            new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
+      } else {
+        CompatibilityResult.requiresMigration()
+      }
+    } else {
+      CompatibilityResult.compatible()
+    }
+  }
 }
 
 object OptionSerializer {
 
-  class OptionSerializerConfigSnapshot[A](
-      private val elemSerializer: TypeSerializer[A])
-    extends CompositeTypeSerializerConfigSnapshot(elemSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
+  /**
+    * We need to keep this to be compatible with snapshots taken in Flink 1.3.0.
+    * Once Flink 1.3.x is no longer supported, this can be removed.
+    */
+  class OptionSerializerConfigSnapshot[A]()
+      extends CompositeTypeSerializerConfigSnapshot {
 
     override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/9cb728e2/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 6299a24..5963987 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -151,16 +151,16 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     obj.isInstanceOf[TraversableSerializer[_, _]]
   }
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    new TraversableSerializer.TraversableSerializerConfigSnapshot[E](elementSerializer)
+  override def snapshotConfiguration(): TraversableSerializerConfigSnapshot[E] = {
+    new TraversableSerializerConfigSnapshot[E](elementSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
 
     configSnapshot match {
-      case traversableSerializerConfigSnapshot:
-          TraversableSerializer.TraversableSerializerConfigSnapshot[E] =>
+      case traversableSerializerConfigSnapshot
+          : TraversableSerializerConfigSnapshot[E] =>
 
         val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
           traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
@@ -178,20 +178,3 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     }
   }
 }
-
-object TraversableSerializer {
-
-  class TraversableSerializerConfigSnapshot[E](
-      private var elementSerializer: TypeSerializer[E])
-    extends CompositeTypeSerializerConfigSnapshot(elementSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
-
-    override def getVersion = TraversableSerializerConfigSnapshot.VERSION
-  }
-
-  object TraversableSerializerConfigSnapshot {
-    val VERSION = 1
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cb728e2/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index 641caa1..a88cce7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -103,51 +103,59 @@ class TrySerializer[A](
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    new TrySerializer.TrySerializerConfigSnapshot[A](elemSerializer, throwableSerializer)
+  override def snapshotConfiguration(): ScalaTrySerializerConfigSnapshot[A] = {
+    new ScalaTrySerializerConfigSnapshot[A](elemSerializer, throwableSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Try[A]] = {
 
     configSnapshot match {
-      case trySerializerConfigSnapshot: TrySerializer.TrySerializerConfigSnapshot[A] =>
-        val previousSerializersAndConfigs =
-          trySerializerConfigSnapshot.getNestedSerializersAndConfigs
-
-        val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
-          previousSerializersAndConfigs.get(0).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousSerializersAndConfigs.get(0).f1,
-          elemSerializer)
-
-        val throwableCompatRes = CompatibilityUtil.resolveCompatibilityResult(
-          previousSerializersAndConfigs.get(1).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousSerializersAndConfigs.get(1).f1,
-          throwableSerializer)
-
-        if (elemCompatRes.isRequiresMigration || throwableCompatRes.isRequiresMigration)
{
-          CompatibilityResult.requiresMigration()
-        } else {
-          CompatibilityResult.compatible()
-        }
-
+      case trySerializerConfigSnapshot
+          : ScalaTrySerializerConfigSnapshot[A] =>
+        ensureCompatibility(trySerializerConfigSnapshot)
+      case legacyTrySerializerConfigSnapshot
+          : TrySerializer.TrySerializerConfigSnapshot[A] =>
+        ensureCompatibility(legacyTrySerializerConfigSnapshot)
       case _ => CompatibilityResult.requiresMigration()
     }
   }
+
+  private def ensureCompatibility(
+      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
+      : CompatibilityResult[Option[A]] = {
+
+    val previousSerializersAndConfigs =
+      compositeConfigSnapshot.getNestedSerializersAndConfigs
+
+    val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+      previousSerializersAndConfigs.get(0).f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      previousSerializersAndConfigs.get(0).f1,
+      elemSerializer)
+
+    val throwableCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+      previousSerializersAndConfigs.get(1).f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      previousSerializersAndConfigs.get(1).f1,
+      throwableSerializer)
+
+    if (elemCompatRes.isRequiresMigration || throwableCompatRes.isRequiresMigration) {
+      CompatibilityResult.requiresMigration()
+    } else {
+      CompatibilityResult.compatible()
+    }
+  }
 }
 
 object TrySerializer {
 
-  class TrySerializerConfigSnapshot[A](
-      private var elemSerializer: TypeSerializer[A],
-      private var throwableSerializer: TypeSerializer[Throwable])
-    extends CompositeTypeSerializerConfigSnapshot(
-      elemSerializer, throwableSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null, null)
+  /**
+    * We need to keep this to be compatible with snapshots taken in Flink 1.3.0.
+    * Once Flink 1.3.x is no longer supported, this can be removed.
+    */
+  class TrySerializerConfigSnapshot[A]()
+      extends CompositeTypeSerializerConfigSnapshot() {
 
     override def getVersion: Int = TrySerializerConfigSnapshot.VERSION
   }


Mime
View raw message