flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [05/12] flink git commit: [FLINK-6844] [scala] Implement compatibility methods for TraversableSerializer
Date Wed, 07 Jun 2017 16:30:02 GMT
[FLINK-6844] [scala] Implement compatibility methods for TraversableSerializer

This closes #4081.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c11d5ed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c11d5ed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c11d5ed5

Branch: refs/heads/master
Commit: c11d5ed5388a5a30ca4ea0c5ac68e22e5989cb54
Parents: bdffde3
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Mon Jun 5 20:52:57 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Wed Jun 7 18:28:58 2017 +0200

----------------------------------------------------------------------
 .../scala/typeutils/TraversableSerializer.scala | 41 ++++++++++++++++++--
 1 file changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c11d5ed5/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 1ac46f9..6299a24 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.typeutils
 import java.io.ObjectInputStream
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 import scala.collection.generic.CanBuildFrom
@@ -152,11 +152,46 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
   }
 
   override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    throw new UnsupportedOperationException()
+    new TraversableSerializer.TraversableSerializerConfigSnapshot[E](elementSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
-    throw new UnsupportedOperationException()
+
+    configSnapshot match {
+      case traversableSerializerConfigSnapshot:
+          TraversableSerializer.TraversableSerializerConfigSnapshot[E] =>
+
+        val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+          traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
+          elementSerializer)
+
+        if (elemCompatRes.isRequiresMigration) {
+          CompatibilityResult.requiresMigration()
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration()
+    }
+  }
+}
+
+object TraversableSerializer {
+
+  class TraversableSerializerConfigSnapshot[E](
+      private var elementSerializer: TypeSerializer[E])
+    extends CompositeTypeSerializerConfigSnapshot(elementSerializer) {
+
+    /** This empty nullary constructor is required for deserializing the configuration. */
+    def this() = this(null)
+
+    override def getVersion = TraversableSerializerConfigSnapshot.VERSION
+  }
+
+  object TraversableSerializerConfigSnapshot {
+    val VERSION = 1
   }
 }


Mime
View raw message