flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-7939] [table] Fix Table conversion of DataStream of AtomicType.
Date Wed, 01 Nov 2017 10:55:42 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 c3289c9d9 -> 168378d98


[FLINK-7939] [table] Fix Table conversion of DataStream of AtomicType.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/168378d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/168378d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/168378d9

Branch: refs/heads/release-1.3
Commit: 168378d98ddf591f780a939ee74310ec8d04d517
Parents: c3289c9
Author: Fabian Hueske <fhueske@apache.org>
Authored: Sat Oct 28 22:13:23 2017 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Nov 1 09:43:33 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/TableEnvironment.scala      |   9 +-
 .../flink/table/plan/schema/FlinkTable.scala    |   8 +-
 .../stream/StreamTableEnvironmentTest.scala     |  37 +++++
 .../datastream/TimeAttributesITCase.scala       | 159 ++++++++++++++-----
 4 files changed, 168 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/168378d9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 4ceeece..37bcd0b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -622,10 +622,13 @@ abstract class TableEnvironment(val config: TableConfig) {
             "Please specify the type of the input with a RowTypeInfo.")
       case a: AtomicType[_] =>
         exprs.zipWithIndex flatMap {
+          case (_: TimeAttribute, _) =>
+            None
+          case (UnresolvedFieldReference(name), idx) if idx > 0 =>
+            // only accept the first field for an atomic type
+            throw new TableException("Only the first field can reference an atomic type.")
           case (UnresolvedFieldReference(name), idx) =>
-            if (idx > 0) {
-              throw new TableException("Table of atomic type can only have a single field.")
-            }
+            // first field reference is mapped to atomic type
             Some((0, name))
           case _ => throw new TableException("Field reference expression requested.")
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/168378d9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
index fd992c5..c360a6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -67,9 +67,13 @@ abstract class FlinkTable[T](
         }
         fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
       case aType: AtomicType[_] =>
-        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
+        if (fieldIndexes.exists(_ > 0)) {
           throw new TableException(
-            "Non-composite input type may have only a single field and its index must be
0.")
+            "Invalid index for table of atomic type encountered. Please report a bug.")
+        }
+        if (fieldIndexes.count(_ == 0) > 1) {
+          throw new TableException(
+            "Atomic input type may have only be referenced by a single table field.")
         }
         Array(aType)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/168378d9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
index 3c1668f..3612423 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
@@ -50,6 +50,43 @@ class StreamTableEnvironmentTest extends TableTestBase {
   }
 
   @Test
+  def testProctimeAttributeWithAtomicInput(): Unit = {
+    val util = streamTestUtil()
+    // cannot replace an attribute with proctime
+    util.addTable[String]('s, 'pt.proctime)
+  }
+
+  @Test
+  def testReplacingRowtimeAttributeWithAtomicInput(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[Long]('rt.rowtime)
+  }
+
+  @Test
+  def testAppendedRowtimeAttributeWithAtomicInput(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[String]('s, 'rt.rowtime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttributeWithAtomicInput1(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[String]('s, 'rt.rowtime, 'pt.proctime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttributeWithAtomicInput2(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[String]('s, 'pt.proctime, 'rt.rowtime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttributeWithAtomicInput3(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[Long]('rt.rowtime, 'pt.proctime)
+  }
+
+  @Test
   def testProctimeAttribute(): Unit = {
     val util = streamTestUtil()
     // cannot replace an attribute with proctime

http://git-wip-us.apache.org/repos/asf/flink/blob/168378d9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index bb63abb..835bd77 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -38,7 +38,7 @@ import org.apache.flink.table.api.scala.stream.utils.StreamITCase
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
 import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
-import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark,
TimestampWithEqualWatermarkPojo}
+import org.apache.flink.table.runtime.datastream.TimeAttributesITCase._
 import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute,
StreamTableSource}
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -99,6 +99,70 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  def testAtomicType1(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(Seq(1L, 2L, 3L, 4L, 7L, 8L, 16L))
+      .assignTimestampsAndWatermarks(new AtomicTimestampWithEqualWatermark())
+    val table = stream.toTable(
+      tEnv, 'rowtime.rowtime, 'proctime.proctime)
+
+    val t = table
+      .where('proctime.cast(Types.LONG) > 0)
+      .select('rowtime.cast(Types.STRING))
+
+    val results = t.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.001",
+      "1970-01-01 00:00:00.002",
+      "1970-01-01 00:00:00.003",
+      "1970-01-01 00:00:00.004",
+      "1970-01-01 00:00:00.007",
+      "1970-01-01 00:00:00.008",
+      "1970-01-01 00:00:00.016")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAtomicType2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(Seq(1L, 2L, 3L, 4L, 7L, 8L, 16L))
+      .assignTimestampsAndWatermarks(new AtomicTimestampWithEqualWatermark())
+    val table = stream.toTable(
+      tEnv, 'l, 'rowtime.rowtime, 'proctime.proctime)
+
+    val t = table
+      .where('proctime.cast(Types.LONG) > 0)
+      .select('l, 'rowtime.cast(Types.STRING))
+
+    val results = t.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1,1970-01-01 00:00:00.001",
+      "2,1970-01-01 00:00:00.002",
+      "3,1970-01-01 00:00:00.003",
+      "4,1970-01-01 00:00:00.004",
+      "7,1970-01-01 00:00:00.007",
+      "8,1970-01-01 00:00:00.008",
+      "16,1970-01-01 00:00:00.016")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
   def testCalcMaterialization(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -435,13 +499,29 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase
{
 }
 
 object TimeAttributesITCase {
+
+  class AtomicTimestampWithEqualWatermark
+    extends AssignerWithPunctuatedWatermarks[Long] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: Long,
+        extractedTimestamp: Long): Watermark = {
+      new Watermark(extractedTimestamp)
+    }
+
+    override def extractTimestamp(
+        element: Long,
+        previousElementTimestamp: Long): Long = {
+      element
+    }
+  }
+
   class TimestampWithEqualWatermark
-  extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)]
{
+    extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)]
{
 
     override def checkAndGetNextWatermark(
         lastElement: (Long, Int, Double, Float, BigDecimal, String),
-        extractedTimestamp: Long)
-      : Watermark = {
+        extractedTimestamp: Long): Watermark = {
       new Watermark(extractedTimestamp)
     }
 
@@ -452,13 +532,12 @@ object TimeAttributesITCase {
     }
   }
 
-class TimestampWithEqualWatermarkPojo
-  extends AssignerWithPunctuatedWatermarks[TestPojo] {
+  class TimestampWithEqualWatermarkPojo
+    extends AssignerWithPunctuatedWatermarks[TestPojo] {
 
     override def checkAndGetNextWatermark(
         lastElement: TestPojo,
-        extractedTimestamp: Long)
-      : Watermark = {
+        extractedTimestamp: Long): Watermark = {
       new Watermark(extractedTimestamp)
     }
 
@@ -475,41 +554,41 @@ class TimestampWithEqualWatermarkPojo
     var b2: String = "skip me"
     var c: String = _
   }
-}
-
-class TestTableSource
-  extends StreamTableSource[Row]
-    with DefinedRowtimeAttribute
-    with DefinedProctimeAttribute {
 
-  override def getDataStream(env: JStreamExecEnv): DataStream[Row] = {
-
-    def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, l.asInstanceOf[JLong])
-
-    val rows = Seq(
-      toRow(1, "A", 1000L),
-      toRow(2, "B", 2000L),
-      toRow(3, "C", 3000L),
-      toRow(4, "D", 4000L),
-      toRow(5, "E", 5000L),
-      toRow(6, "F", 6000L)
-    )
-
-    env
-      .fromCollection(rows.asJava).returns(getReturnType)
-      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] {
-        override def extractAscendingTimestamp(r: Row): Long = r.getField(2).asInstanceOf[Long]
-      })
-  }
+  class TestTableSource
+    extends StreamTableSource[Row]
+      with DefinedRowtimeAttribute
+      with DefinedProctimeAttribute {
+
+    override def getDataStream(env: JStreamExecEnv): DataStream[Row] = {
+
+      def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, l.asInstanceOf[JLong])
+
+      val rows = Seq(
+        toRow(1, "A", 1000L),
+        toRow(2, "B", 2000L),
+        toRow(3, "C", 3000L),
+        toRow(4, "D", 4000L),
+        toRow(5, "E", 5000L),
+        toRow(6, "F", 6000L)
+      )
+
+      env
+        .fromCollection(rows.asJava).returns(getReturnType)
+        .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] {
+          override def extractAscendingTimestamp(r: Row): Long = r.getField(2).asInstanceOf[Long]
+        })
+    }
 
-  override def getRowtimeAttribute: String = "rowtime"
+    override def getRowtimeAttribute: String = "rowtime"
 
-  override def getProctimeAttribute: String = "proctime"
+    override def getProctimeAttribute: String = "proctime"
 
-  override def getReturnType: TypeInformation[Row] = {
-    new RowTypeInfo(
-      Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
-      Array("a", "b", "c")
-    )
+    override def getReturnType: TypeInformation[Row] = {
+      new RowTypeInfo(
+        Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
+        Array("a", "b", "c")
+      )
+    }
   }
 }


Mime
View raw message