flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [4/5] flink git commit: [FLINK-6602] [table] Prevent TableSources with empty time attribute names.
Date Mon, 19 Jun 2017 22:18:51 GMT
[FLINK-6602] [table] Prevent TableSources with empty time attribute names.

This closes #4135.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7f6d024
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7f6d024
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7f6d024

Branch: refs/heads/release-1.3
Commit: c7f6d0245cd0d47a99d9badcd43f1ebb8758125e
Parents: 8b91df2
Author: zhe li <zhe li>
Authored: Fri Jun 16 23:13:09 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Jun 19 22:43:10 2017 +0200

----------------------------------------------------------------------
 .../plan/schema/StreamTableSourceTable.scala    | 16 +++++++++++--
 .../api/scala/stream/TableSourceTest.scala      | 24 +++++++++++++++++++-
 2 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7f6d024/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index fa15288..408381d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -39,7 +39,13 @@ class StreamTableSourceTable[T](
     val fieldCnt = fieldNames.length
 
     val rowtime = tableSource match {
-      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null
=>
+      case nullTimeSource : DefinedRowtimeAttribute
+        if nullTimeSource.getRowtimeAttribute == null =>
+          None
+      case emptyStringTimeSource: DefinedRowtimeAttribute
+        if emptyStringTimeSource.getRowtimeAttribute.trim.equals("")  =>
+          throw TableException("The name of the rowtime attribute must not be empty.")
+      case timeSource: DefinedRowtimeAttribute  =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
         Some((fieldCnt, rowtimeAttribute))
       case _ =>
@@ -47,7 +53,13 @@ class StreamTableSourceTable[T](
     }
 
     val proctime = tableSource match {
-      case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null
=>
+      case nullTimeSource : DefinedProctimeAttribute
+        if nullTimeSource.getProctimeAttribute == null =>
+          None
+      case emptyStringTimeSource: DefinedProctimeAttribute
+        if emptyStringTimeSource.getProctimeAttribute.trim.equals("")  =>
+          throw TableException("The name of the proctime attribute must not be empty.")
+      case timeSource: DefinedProctimeAttribute  =>
         val proctimeAttribute = timeSource.getProctimeAttribute
         Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/c7f6d024/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index 890ad32..32961a6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute,
StreamTableSource}
 import org.apache.flink.table.utils.TableTestBase
@@ -121,6 +121,28 @@ class TableSourceTest extends TableTestBase {
       )
     util.verifyTable(t, expected)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRowtimeTableSourceWithEmptyName(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))
+
+    val t = util.tEnv.scan("rowTimeT")
+      .select('id)
+
+    util.tEnv.optimize(t.getRelNode, false)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testProctimeTableSourceWithEmptyName(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
+
+    val t = util.tEnv.scan("procTimeT")
+      .select('id)
+
+    util.tEnv.optimize(t.getRelNode, false)
+  }
 }
 
 class TestRowtimeSource(timeField: String)


Mime
View raw message