flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/4] flink git commit: [FLINK-7939] [table] Fix Table conversion of DataStream of AtomicType.
Date Wed, 01 Nov 2017 01:52:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master 36b663f45 -> 0e92b6632


[FLINK-7939] [table] Fix Table conversion of DataStream of AtomicType.

This closes #4917.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/505d478d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/505d478d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/505d478d

Branch: refs/heads/master
Commit: 505d478d55c93e07a7227e375939eca19ec4d082
Parents: 36b663f
Author: Fabian Hueske <fhueske@apache.org>
Authored: Sat Oct 28 22:13:23 2017 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Oct 31 21:40:33 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/TableEnvironment.scala      |  9 +-
 .../api/stream/StreamTableEnvironmentTest.scala | 37 ++++++++
 .../runtime/stream/TimeAttributesITCase.scala   | 88 +++++++++++++++++++-
 3 files changed, 128 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/505d478d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index fec7f1a..c3cab13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -810,10 +810,13 @@ abstract class TableEnvironment(val config: TableConfig) {
             "Please specify the type of the input with a RowTypeInfo.")
       case a: AtomicType[_] =>
         exprs.zipWithIndex flatMap {
+          case (_: TimeAttribute, _) =>
+            None
+          case (UnresolvedFieldReference(name), idx) if idx > 0 =>
+            // only accept the first field for an atomic type
+            throw new TableException("Only the first field can reference an atomic type.")
           case (UnresolvedFieldReference(name), idx) =>
-            if (idx > 0) {
-              throw new TableException("Table of atomic type can only have a single field.")
-            }
+            // first field reference is mapped to atomic type
             Some((0, name))
           case _ => throw new TableException("Field reference expression requested.")
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/505d478d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 1b99679..863d07b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -66,6 +66,43 @@ class StreamTableEnvironmentTest extends TableTestBase {
   }
 
   @Test
+  def testProctimeAttributeWithAtomicInput(): Unit = {
+    val util = streamTestUtil()
+    // cannot replace an attribute with proctime
+    util.addTable[String]('s, 'pt.proctime)
+  }
+
+  @Test
+  def testReplacingRowtimeAttributeWithAtomicInput(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[Long]('rt.rowtime)
+  }
+
+  @Test
+  def testAppendedRowtimeAttributeWithAtomicInput(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[String]('s, 'rt.rowtime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttributeWithAtomicInput1(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[String]('s, 'rt.rowtime, 'pt.proctime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttributeWithAtomicInput2(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[String]('s, 'pt.proctime, 'rt.rowtime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttributeWithAtomicInput3(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[Long]('rt.rowtime, 'pt.proctime)
+  }
+
+  @Test
   def testProctimeAttribute(): Unit = {
     val util = streamTestUtil()
     // cannot replace an attribute with proctime

http://git-wip-us.apache.org/repos/asf/flink/blob/505d478d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index b7f97f9..5086601 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -34,7 +34,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
 import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types}
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
-import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark,
TimestampWithEqualWatermarkPojo}
+import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark,
TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
 import org.apache.flink.table.runtime.utils.StreamITCase
 import org.apache.flink.table.utils.TestTableSourceWithTime
 import org.apache.flink.types.Row
@@ -58,6 +58,70 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
 
   @Test
+  def testAtomicType1(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(Seq(1L, 2L, 3L, 4L, 7L, 8L, 16L))
+      .assignTimestampsAndWatermarks(new AtomicTimestampWithEqualWatermark())
+    val table = stream.toTable(
+      tEnv, 'rowtime.rowtime, 'proctime.proctime)
+
+    val t = table
+      .where('proctime.cast(Types.LONG) > 0)
+      .select('rowtime.cast(Types.STRING))
+
+    val results = t.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.001",
+      "1970-01-01 00:00:00.002",
+      "1970-01-01 00:00:00.003",
+      "1970-01-01 00:00:00.004",
+      "1970-01-01 00:00:00.007",
+      "1970-01-01 00:00:00.008",
+      "1970-01-01 00:00:00.016")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAtomicType2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(Seq(1L, 2L, 3L, 4L, 7L, 8L, 16L))
+      .assignTimestampsAndWatermarks(new AtomicTimestampWithEqualWatermark())
+    val table = stream.toTable(
+      tEnv, 'l, 'rowtime.rowtime, 'proctime.proctime)
+
+    val t = table
+      .where('proctime.cast(Types.LONG) > 0)
+      .select('l, 'rowtime.cast(Types.STRING))
+
+    val results = t.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1,1970-01-01 00:00:00.001",
+      "2,1970-01-01 00:00:00.002",
+      "3,1970-01-01 00:00:00.003",
+      "4,1970-01-01 00:00:00.004",
+      "7,1970-01-01 00:00:00.007",
+      "8,1970-01-01 00:00:00.008",
+      "16,1970-01-01 00:00:00.016")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
   def testCalcMaterialization(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -566,8 +630,26 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase
{
 }
 
 object TimeAttributesITCase {
+
+  class AtomicTimestampWithEqualWatermark
+    extends AssignerWithPunctuatedWatermarks[Long] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: Long,
+        extractedTimestamp: Long)
+    : Watermark = {
+      new Watermark(extractedTimestamp)
+    }
+
+    override def extractTimestamp(
+        element: Long,
+        previousElementTimestamp: Long): Long = {
+      element
+    }
+  }
+
   class TimestampWithEqualWatermark
-  extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)]
{
+    extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)]
{
 
     override def checkAndGetNextWatermark(
         lastElement: (Long, Int, Double, Float, BigDecimal, String),
@@ -584,7 +666,7 @@ object TimeAttributesITCase {
   }
 
   class TimestampWithEqualWatermarkPojo
-  extends AssignerWithPunctuatedWatermarks[TestPojo] {
+    extends AssignerWithPunctuatedWatermarks[TestPojo] {
 
     override def checkAndGetNextWatermark(
         lastElement: TestPojo,


Mime
View raw message