flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-2231] Create Serializer and Comparator for Scala Enumerations.
Date Wed, 29 Jul 2015 12:32:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3e49daf26 -> a34c94167


[FLINK-2231] Create Serializer and Comparator for Scala Enumerations.

This closes #935


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a34c9416
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a34c9416
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a34c9416

Branch: refs/heads/master
Commit: a34c9416790e0cddedb3f2518fd0bea2331cbcc0
Parents: 3e49daf
Author: Alexander Alexandrov <alexander.s.alexandrov@gmail.com>
Authored: Fri Jul 24 20:57:04 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Jul 29 14:24:56 2015 +0200

----------------------------------------------------------------------
 .../flink/api/scala/codegen/TypeAnalyzer.scala  |  38 ++++++-
 .../api/scala/codegen/TypeDescriptors.scala     |   6 +-
 .../api/scala/codegen/TypeInformationGen.scala  |  18 +++-
 .../scala/typeutils/EnumValueComparator.scala   | 105 +++++++++++++++++++
 .../scala/typeutils/EnumValueSerializer.scala   |  65 ++++++++++++
 .../api/scala/typeutils/EnumValueTypeInfo.scala |  52 +++++++++
 .../scala/runtime/EnumValueComparatorTest.scala |  53 ++++++++++
 .../ScalaSpecialTypesSerializerTest.scala       |  27 +++--
 8 files changed, 348 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a34c9416/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
index 6ad73a5..0421557 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
@@ -28,7 +28,6 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
   with TypeDescriptors[C] =>
 
   import c.universe._
-  import compat._
 
   // This value is controlled by the udtRecycling compiler option
   var enableMutableUDTs = false
@@ -61,6 +60,8 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
 
           case EitherType(leftTpe, rightTpe) => analyzeEither(id, tpe, leftTpe, rightTpe)
 
+          case EnumValueType(enum) => EnumValueDescriptor(id, tpe, enum)
+
           case TryType(elemTpe) => analyzeTry(id, tpe, elemTpe)
 
           case OptionType(elemTpe) => analyzeOption(id, tpe, elemTpe)
@@ -302,6 +303,8 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
           traversable match {
             case TypeRef(_, _, elemTpe :: Nil) =>
 
+              import compat._ // this is needed in order to compile in Scala 2.11
+
               // determine whether we can find an implicit for the CanBuildFrom because
               // TypeInformationGen requires this. This catches the case where a user
               // has a custom class that implements Iterable[], for example.
@@ -350,6 +353,37 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
       }
     }
 
+    private object EnumValueType {
+      def unapply(tpe: Type): Option[ModuleSymbol] = {
+        // somewhat hacky solution based on the 'org.example.MyEnum.Value' FQN
+        // convention, compatible with Scala 2.10
+        try {
+          val m = c.universe.rootMirror
+          // get fully-qualified type name, e.g. org.example.MyEnum.Value
+          val fqn = tpe.normalize.toString.split('.')
+          // get FQN parent
+          val owner = m.staticModule(fqn.slice(0, fqn.size - 1).mkString("."))
+
+          val enumerationSymbol = typeOf[scala.Enumeration].typeSymbol
+          if (owner.typeSignature.baseClasses.contains(enumerationSymbol)) {
+            Some(owner)
+          } else {
+            None
+          }
+        } catch {
+          case e: Throwable => None
+        }
+        // TODO: use this once 2.10 is no longer supported
+        // tpe is the Enumeration.Value alias, get the owner
+        // val owner = tpe.typeSymbol.owner
+        // if (owner.isModule &&
+        //     owner.typeSignature.baseClasses.contains(typeOf[scala.Enumeration].typeSymbol))
+        //   Some(owner.asModule)
+        // else
+        //   None
+      }
+    }
+
     private object TryType {
       def unapply(tpe: Type): Option[Type] = {
         if (tpe <:< typeOf[scala.util.Try[_]]) {
@@ -441,7 +475,7 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
 
     def getBoxInfo(prim: Symbol, primName: String, boxName: String) = {
       val (default, wrapper) = primitives(prim)
-      val box = { t: Tree => 
+      val box = { t: Tree =>
         Apply(
           Select(
             Select(Ident(newTermName("scala")), newTermName("Predef")),

http://git-wip-us.apache.org/repos/asf/flink/blob/a34c9416/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
index c6006a2..b65defd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
@@ -18,11 +18,7 @@
 package org.apache.flink.api.scala.codegen
 
 import scala.language.postfixOps
-
 import scala.reflect.macros.Context
-import scala.reflect.classTag
-import scala.reflect.ClassTag
-import scala.Option.option2Iterable
 
 // These are only used internally while analyzing Scala types in TypeAnalyzer and TypeInformationGen
 
@@ -48,6 +44,8 @@ private[flink] trait TypeDescriptors[C <: Context] { this: MacroContextHolder[C]
   case class EitherDescriptor(id: Int, tpe: Type, left: UDTDescriptor, right: UDTDescriptor)
     extends UDTDescriptor
 
+  case class EnumValueDescriptor(id: Int, tpe: Type, enum: ModuleSymbol) extends UDTDescriptor
+
   case class TryDescriptor(id: Int, tpe: Type, elem: UDTDescriptor) extends UDTDescriptor
 
   case class OptionDescriptor(id: Int, tpe: Type, elem: UDTDescriptor) extends UDTDescriptor

http://git-wip-us.apache.org/repos/asf/flink/blob/a34c9416/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index a6fbb71..07f7205 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -21,9 +21,7 @@ import java.lang.reflect.{Field, Modifier}
 
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo._
-
 import org.apache.flink.api.common.typeutils._
-import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.types.Value
@@ -32,7 +30,6 @@ import org.apache.hadoop.io.Writable
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.postfixOps
-
 import scala.reflect.macros.Context
 
 private[flink] trait TypeInformationGen[C <: Context] {
@@ -66,6 +63,8 @@ private[flink] trait TypeInformationGen[C <: Context] {
 
     case e: EitherDescriptor => mkEitherTypeInfo(e)
 
+    case e: EnumValueDescriptor => mkEnumValueTypeInfo(e)
+
     case tr: TryDescriptor => mkTryTypeInfo(tr)
 
     case o: OptionDescriptor => mkOptionTypeInfo(o)
@@ -150,6 +149,19 @@ private[flink] trait TypeInformationGen[C <: Context] {
     c.Expr[TypeInformation[T]](result)
   }
 
+  def mkEnumValueTypeInfo[T: c.WeakTypeTag](d: EnumValueDescriptor): c.Expr[TypeInformation[T]]
= {
+
+    val enumValueClass = c.Expr[Class[T]](Literal(Constant(weakTypeOf[T])))
+
+    val result = q"""
+      import org.apache.flink.api.scala.typeutils.EnumValueTypeInfo
+
+      new EnumValueTypeInfo[${d.enum.typeSignature}](${d.enum}, $enumValueClass)
+    """
+
+    c.Expr[TypeInformation[T]](result)
+  }
+
   def mkTryTypeInfo[T: c.WeakTypeTag](desc: TryDescriptor): c.Expr[TypeInformation[T]] =
{
 
     val elemTypeInfo = mkTypeInfo(desc.elem)(c.WeakTypeTag(desc.elem.tpe))

http://git-wip-us.apache.org/repos/asf/flink/blob/a34c9416/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueComparator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueComparator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueComparator.scala
new file mode 100644
index 0000000..4aa0a73
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueComparator.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeutils.base.IntComparator
+import org.apache.flink.core.memory.{DataOutputView, DataInputView, MemorySegment}
+
+/**
+ * Comparator for [[Enumeration]] values.
+ */
+@SerialVersionUID(1000L)
+class EnumValueComparator[E <: Enumeration](ascComp: Boolean) extends TypeComparator[E#Value]
{
+
+  type T = E#Value
+
+  final val intComparator = new IntComparator(ascComp)
+
+  // We cannot use the Clone Constructor from Scala so we have to do it manually
+  def duplicate: TypeComparator[T] = {
+    new EnumValueComparator[E](ascComp)
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Comparator Methods
+  // --------------------------------------------------------------------------------------------
+
+  override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView):
Int = {
+    intComparator.compareSerialized(firstSource, secondSource)
+  }
+
+  def supportsNormalizedKey: Boolean = {
+    intComparator.supportsNormalizedKey
+  }
+
+  def getNormalizeKeyLen: Int = {
+    intComparator.getNormalizeKeyLen
+  }
+
+  def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean = {
+    intComparator.isNormalizedKeyPrefixOnly(keyBytes)
+  }
+
+  override def putNormalizedKey(v: T, target: MemorySegment, offset: Int, numBytes: Int):
Unit = {
+    intComparator.putNormalizedKey(v.id, target, offset, numBytes)
+  }
+
+  override def hash(record: T): Int = intComparator.hash(record.id)
+
+  override def setReference(toCompare: T): Unit = {
+    intComparator.setReference(toCompare.id)
+  }
+
+  override def equalToReference(candidate: T): Boolean = {
+    intComparator.equalToReference(candidate.id)
+  }
+
+  override def compareToReference(referencedComparator: TypeComparator[T]): Int = {
+    intComparator.compareToReference(referencedComparator.asInstanceOf[this.type].intComparator)
+  }
+
+  override def compare(first: E#Value, second: E#Value): Int = {
+    intComparator.compare(first.id, second.id)
+  }
+
+  override def invertNormalizedKey(): Boolean = {
+    intComparator.invertNormalizedKey()
+  }
+
+  override def writeWithKeyNormalization(record: T, target: DataOutputView): Unit = {
+    intComparator.writeWithKeyNormalization(record.id, target)
+  }
+
+  override def supportsSerializationWithKeyNormalization(): Boolean = {
+    intComparator.supportsSerializationWithKeyNormalization()
+  }
+
+  override def extractKeys(record: AnyRef, target: Array[AnyRef], index: Int): Int = {
+    target(index) = record
+    1
+  }
+
+  override lazy val getFlatComparators: Array[TypeComparator[_]] = {
+    Array(this)
+  }
+
+  override def readWithKeyDenormalization(reuse: T, source: DataInputView): T = {
+    throw new UnsupportedOperationException
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a34c9416/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
new file mode 100644
index 0000000..8d03676
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.IntSerializer
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+/**
+ * Serializer for [[Enumeration]] values.
+ */
+class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[E#Value]
{
+
+  type T = E#Value
+
+  val intSerializer = new IntSerializer()
+
+  override def duplicate: EnumValueSerializer[E] = this
+
+  override def createInstance: T = enum(0)
+
+  override def isImmutableType: Boolean = true
+
+  override def getLength: Int = intSerializer.getLength
+
+  override def copy(from: T): T = enum.apply(from.id)
+
+  override def copy(from: T, reuse: T): T = copy(from)
+
+  override def copy(src: DataInputView, tgt: DataOutputView): Unit = intSerializer.copy(src,
tgt)
+
+  override def serialize(v: T, tgt: DataOutputView): Unit = intSerializer.serialize(v.id,
tgt)
+
+  override def deserialize(source: DataInputView): T = enum(intSerializer.deserialize(source))
+
+  override def deserialize(reuse: T, source: DataInputView): T = deserialize(source)
+
+  override def equals(obj: Any): Boolean = {
+    if (obj != null && obj.isInstanceOf[EnumValueSerializer[_]]) {
+      val other = obj.asInstanceOf[EnumValueSerializer[_]]
+      this.enum == other.enum
+    } else {
+      false
+    }
+  }
+
+  override def hashCode(): Int = {
+    enum.hashCode()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a34c9416/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
new file mode 100644
index 0000000..c66e4bc
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+
+import scala.collection.JavaConverters._
+
+/**
+ * TypeInformation for [[Enumeration]] values.
+ */
+class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value])
+  extends TypeInformation[E#Value] with AtomicType[E#Value] {
+
+  type T = E#Value
+
+  override def isBasicType: Boolean = false
+  override def isTupleType: Boolean = false
+  override def isKeyType: Boolean = true
+  override def getTotalFields: Int = 1
+  override def getArity: Int = 1
+  override def getTypeClass = clazz
+  override def getGenericParameters = List.empty[TypeInformation[_]].asJava
+
+
+  def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
+    new EnumValueSerializer[E](enum)
+  }
+
+  override def createComparator(ascOrder: Boolean, config: ExecutionConfig): TypeComparator[T]
= {
+    new EnumValueComparator[E](ascOrder)
+  }
+
+  override def toString = clazz.getCanonicalName
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a34c9416/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/EnumValueComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/EnumValueComparatorTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/EnumValueComparatorTest.scala
new file mode 100644
index 0000000..a0cc03f
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/EnumValueComparatorTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.EnumValueTypeInfo
+
+
+class EnumValueComparatorTest extends ComparatorTestBase[Suit.Value] {
+
+  protected def createComparator(ascending: Boolean): TypeComparator[Suit.Value] = {
+    val ti = createTypeInformation[Suit.Value]
+    ti.asInstanceOf[EnumValueTypeInfo[Suit.type]].createComparator(ascending, new ExecutionConfig)
+  }
+
+  protected def createSerializer: TypeSerializer[Suit.Value] = {
+    val ti = createTypeInformation[Suit.Value]
+    ti.createSerializer(new ExecutionConfig)
+  }
+
+  protected def getSortedTestData: Array[Suit.Value] = {
+    dataISD
+  }
+
+  private val dataISD = Array(
+    Suit.Clubs,
+    Suit.Diamonds,
+    Suit.Hearts,
+    Suit.Spades
+  )
+}
+
+object Suit extends Enumeration {
+  type Suit = Value
+  val Clubs, Diamonds, Hearts, Spades = Value
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a34c9416/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
index e4454b7..155160c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -18,15 +18,14 @@
 package org.apache.flink.api.scala.runtime
 
 import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer}
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.EnumValueTypeInfo
 import org.junit.Assert._
-
-import org.apache.flink.api.common.typeutils.{TypeSerializer, SerializerTestInstance}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.junit.{Assert, Test}
 
-import org.apache.flink.api.scala._
-
 import scala.util.{Failure, Success}
 
 class ScalaSpecialTypesSerializerTest {
@@ -68,6 +67,12 @@ class ScalaSpecialTypesSerializerTest {
   }
 
   @Test
+  def testEnumValue(): Unit = {
+    val testData = Array(WeekDay.Mon, WeekDay.Fri, WeekDay.Tue, WeekDay.Sun, WeekDay.Wed)
+    runTests(testData)
+  }
+
+  @Test
   def testTry(): Unit = {
     val testData = Array(Success("Hell"), Failure(new RuntimeException("test")))
     runTests(testData)
@@ -93,8 +98,11 @@ class ScalaSpecialTypesSerializerTest {
       val typeInfo = implicitly[TypeInformation[T]]
       val serializer = typeInfo.createSerializer(new ExecutionConfig)
       val typeClass = typeInfo.getTypeClass
-      val test =
-        new ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, -1, instances)
+      val test = new ScalaSpecialTypesSerializerTestInstance[T](
+        serializer,
+        typeClass,
+        serializer.getLength,
+        instances)
       test.testAll()
     } catch {
       case e: Exception => {
@@ -159,3 +167,8 @@ class ScalaSpecialTypesSerializerTestInstance[T](
   }
 }
 
+object WeekDay extends Enumeration {
+  type WeekDay = Value
+  val Mon, Tue, Wed, Thu, Fri, Sat, Sun = Value
+}
+


Mime
View raw message