flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [2/2] flink git commit: [FLINK-7596] [table] Restrict equality and improve tests
Date Tue, 26 Sep 2017 14:58:27 GMT
[FLINK-7596] [table] Restrict equality and improve tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/314ca04b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/314ca04b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/314ca04b

Branch: refs/heads/release-1.3
Commit: 314ca04b3a06b960ac324484b099fd66ffcb2fbd
Parents: 701be5d
Author: twalthr <twalthr@apache.org>
Authored: Tue Sep 26 16:37:31 2017 +0200
Committer: twalthr <twalthr@apache.org>
Committed: Tue Sep 26 16:52:29 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 45 +++++++++-----------
 .../datastream/DataStreamCalcITCase.scala       | 16 ++++---
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/314ca04b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 82dd390..853c3e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.calcite
 
 import java.util
-import java.util.List
 
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
@@ -273,39 +272,35 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     canonize(newType)
   }
 
-  private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = {
-    val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY)
-    val nullable = types.asScala.exists(
-      sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
-    )
-    if (hasAny) {
-      if (types.get(0).isInstanceOf[GenericRelDataType] &&
-        types.get(1).isInstanceOf[GenericRelDataType]) {
-        createTypeWithNullability(types.get(0), nullable)
-      } else {
-        throw new RuntimeException("only GenericRelDataType of ANY is supported")
-      }
-    } else {
-      null
-    }
-  }
-
   override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
-    assert(types != null)
-    assert(types.size >= 1)
     val type0 = types.get(0)
     if (type0.getSqlTypeName != null) {
-      val resultType = resolveAnySqlType(types)
+      val resultType = resolveAny(types)
       if (resultType != null) {
-        resultType
+        return resultType
+      }
+    }
+    super.leastRestrictive(types)
+  }
+
+  private def resolveAny(types: util.List[RelDataType]): RelDataType = {
+    val allTypes = types.asScala
+    val hasAny = allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)
+    if (hasAny) {
+      val head = allTypes.head
+      // only allow ANY with exactly the same GenericRelDataType for all types
+      if (allTypes.forall(_ == head)) {
+        val nullable = allTypes.exists(
+          sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
+        )
+        createTypeWithNullability(head, nullable)
       } else {
-        super.leastRestrictive(types)
+        throw TableException("Generic ANY types must have a common type information.")
       }
     } else {
-      super.leastRestrictive(types)
+      null
     }
   }
-
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/314ca04b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index c37a728..e7fb15b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -81,18 +81,24 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testUnionWithAnyType(): Unit = {
-    val list = List((1, new NODE), (2, new NODE))
-    val list2 = List((3, new NODE), (4, new NODE))
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    val s1 = tEnv.fromDataStream(env.fromCollection(list))
-    val s2 = tEnv.fromDataStream(env.fromCollection(list2))
+
+    StreamITCase.testResults = mutable.MutableList()
+    val s1 = env.fromElements((1, new NonPojo), (2, new NonPojo)).toTable(tEnv, 'a, 'b)
+    val s2 = env.fromElements((3, new NonPojo), (4, new NonPojo)).toTable(tEnv, 'a, 'b)
+
     val result = s1.unionAll(s2).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
+
+    val expected = mutable.MutableList("1,{}", "2,{}", "3,{}", "4,{}")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  class NODE {
+  class NonPojo {
     val x = new java.util.HashMap[String, String]()
+
+    override def toString: String = x.toString
   }
 }


Mime
View raw message