flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-6881] [FLINK-6896] [table] Creating a table from a POJO and defining a time attribute fails
Date Tue, 20 Jun 2017 12:40:31 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 782c99873 -> 930216ef9


[FLINK-6881] [FLINK-6896] [table] Creating a table from a POJO and defining a time attribute
fails

This closes #4144.
This closes #4111.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/930216ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/930216ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/930216ef

Branch: refs/heads/release-1.3
Commit: 930216ef95cf7098d7f9f3ba7403dacc3b433817
Parents: 782c998
Author: twalthr <twalthr@apache.org>
Authored: Mon Jun 19 17:06:44 2017 +0200
Committer: twalthr <twalthr@apache.org>
Committed: Tue Jun 20 14:35:34 2017 +0200

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 112 +++++++++++--------
 .../apache/flink/table/api/scala/package.scala  |   2 +-
 .../calcite/RelTimeIndicatorConverter.scala     |  27 ++++-
 .../flink/table/codegen/CodeGenerator.scala     |  68 +++++------
 .../table/expressions/ExpressionParser.scala    |  18 ++-
 .../flink/table/api/java/utils/Pojos.java       |   2 +-
 .../datastream/TimeAttributesITCase.scala       |  89 ++++++++++++++-
 7 files changed, 218 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/930216ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 178bd9f..eb3eb5c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -20,11 +20,10 @@ package org.apache.flink.table.api
 
 import _root_.java.lang.{Boolean => JBool}
 import _root_.java.util.concurrent.atomic.AtomicInteger
-import _root_.java.util.{List => JList}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl,
RelRecordType}
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
 import org.apache.calcite.sql.SqlKind
@@ -34,14 +33,14 @@ import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
+import org.apache.flink.table.calcite.RelTimeIndicatorConverter
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute,
UnresolvedFieldReference}
+import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait,
_}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -438,39 +437,69 @@ abstract class StreamTableEnvironment(
     var rowtime: Option[(Int, String)] = None
     var proctime: Option[(Int, String)] = None
 
-    exprs.zipWithIndex.foreach {
-      case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
-        if (rowtime.isDefined) {
+    def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
+      if (rowtime.isDefined) {
+        throw new TableException(
+          "The rowtime attribute can only be defined once in a table schema.")
+      } else {
+        val mappedIdx = streamType match {
+          case pti: PojoTypeInfo[_] =>
+            pti.getFieldIndex(origName.getOrElse(name))
+          case _ => idx;
+        }
+        // check type of field that is replaced
+        if (mappedIdx < 0) {
           throw new TableException(
-            "The rowtime attribute can only be defined once in a table schema.")
-        } else {
-          // check type of field that is replaced
-          if (idx < fieldTypes.length &&
-            !(TypeCheckUtils.isLong(fieldTypes(idx)) ||
-              TypeCheckUtils.isTimePoint(fieldTypes(idx)))) {
-            throw new TableException(
-              "The rowtime attribute can only be replace a field with a valid time type,
such as " +
-                "Timestamp or Long.")
-          }
-          rowtime = Some(idx, name)
+            s"The rowtime attribute can only replace a valid field. " +
+              s"${origName.getOrElse(name)} is not a field of type $streamType.")
         }
-      case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
-        if (proctime.isDefined) {
+        else if (mappedIdx < fieldTypes.length &&
+          !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
+            TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
+          throw new TableException(
+            s"The rowtime attribute can only replace a field with a valid time type, " +
+              s"such as Timestamp or Long. But was: ${fieldTypes(mappedIdx)}")
+        }
+
+        rowtime = Some(idx, name)
+      }
+    }
+
+    def extractProctime(idx: Int, name: String): Unit = {
+      if (proctime.isDefined) {
           throw new TableException(
             "The proctime attribute can only be defined once in a table schema.")
-        } else {
-          // check that proctime is only appended
-          if (idx < fieldTypes.length) {
-            throw new TableException(
-              "The proctime attribute can only be appended to the table schema and not replace
" +
-                "an existing field. Please move it to the end of the schema.")
-          }
-          proctime = Some(idx, name)
+      } else {
+        // check that proctime is only appended
+        if (idx < fieldTypes.length) {
+          throw new TableException(
+            "The proctime attribute can only be appended to the table schema and not replace
" +
+              "an existing field. Please move it to the end of the schema.")
         }
-      case (u: UnresolvedFieldReference, _) => fieldNames = u.name :: fieldNames
+        proctime = Some(idx, name)
+      }
+    }
 
-      case _ =>
-        throw new TableException("Time attributes can only be defined on field references.")
+    exprs.zipWithIndex.foreach {
+      case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
+        extractRowtime(idx, name, None)
+
+      case (RowtimeAttribute(Alias(UnresolvedFieldReference(origName), name, _)), idx) =>
+        extractRowtime(idx, name, Some(origName))
+
+      case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
+        extractProctime(idx, name)
+
+      case (ProctimeAttribute(Alias(UnresolvedFieldReference(_), name, _)), idx) =>
+        extractProctime(idx, name)
+
+      case (UnresolvedFieldReference(name), _) => fieldNames = name :: fieldNames
+
+      case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = name :: fieldNames
+
+      case (e, _) =>
+        throw new TableException(s"Time attributes can only be defined on field references
or " +
+          s"aliases of field references. But was: $e")
     }
 
     if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
@@ -606,21 +635,10 @@ abstract class StreamTableEnvironment(
     val relNode = table.getRelNode
     val dataStreamPlan = optimize(relNode, updatesAsRetraction)
 
-    // zip original field names with optimized field types
-    val fieldTypes = relNode.getRowType.getFieldList.asScala
-      .zip(dataStreamPlan.getRowType.getFieldList.asScala)
-      // get name of original plan and type of optimized plan
-      .map(x => (x._1.getName, x._2.getType))
-      // add field indexes
-      .zipWithIndex
-      // build new field types
-      .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
-
-    // build a record type from list of field types
-    val rowType = new RelRecordType(
-      fieldTypes.toList.asJava.asInstanceOf[JList[RelDataTypeField]])
-
-    translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
+    // we convert the logical row type to the output row type
+    val convertedOutputType = RelTimeIndicatorConverter.convertOutputType(relNode)
+
+    translate(dataStreamPlan, convertedOutputType, queryConfig, withChangeFlag)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/930216ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
index 9d15c14..cc1a388 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -82,7 +82,7 @@ package object scala extends ImplicitExpressionConversions {
   }
 
   implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]
= {
-    new DataStreamConversions[T](set, set.dataType.asInstanceOf[CompositeType[T]])
+    new DataStreamConversions[T](set, set.dataType)
   }
 
   implicit def table2RowDataStream(table: Table): DataStream[Row] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/930216ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 21fa70b..b28e3f8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.calcite
 
-import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, RelRecordType}
 import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rel.{RelNode, RelShuttle}
@@ -26,10 +26,10 @@ import org.apache.calcite.rex._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
 import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.functions.TimeMaterializationSqlFunction
-import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
 import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType
+import org.apache.flink.table.functions.TimeMaterializationSqlFunction
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -391,4 +391,25 @@ object RelTimeIndicatorConverter {
       convertedRoot
     }
   }
+
+  def convertOutputType(rootRel: RelNode): RelDataType = {
+
+    val timestamp = rootRel
+      .getCluster
+      .getRexBuilder
+      .getTypeFactory
+      .asInstanceOf[FlinkTypeFactory]
+      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+    // convert all time indicators types to timestamps
+    val fields = rootRel.getRowType.getFieldList.map { field =>
+      if (isTimeIndicatorType(field.getType)) {
+        new RelDataTypeFieldImpl(field.getName, field.getIndex, timestamp)
+      } else {
+        field
+      }
+    }
+
+    new RelRecordType(fields)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/930216ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 52a9dcd..67a91aa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -871,12 +871,16 @@ class CodeGenerator(
       returnType: TypeInformation[_ <: Any],
       resultFieldNames: Seq[String])
     : GeneratedExpression = {
-    val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i))
-      yield generateInputAccess(input1, input1Term, i, input1Mapping)
+    val input1AccessExprs = input1Mapping.map { idx =>
+      generateInputAccess(input1, input1Term, idx)
+    }
 
     val input2AccessExprs = input2 match {
-      case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i))
-        yield generateInputAccess(ti, input2Term, i, input2Mapping)
+      case Some(ti) =>
+        input2Mapping.map { idx =>
+          generateInputAccess(ti, input2Term, idx)
+        }.toSeq
+
       case None => Seq() // add nothing
     }
 
@@ -887,15 +891,18 @@ class CodeGenerator(
     * Generates an expression from the left input and the right table function.
     */
   def generateCorrelateAccessExprs: (Seq[GeneratedExpression], Seq[GeneratedExpression])
= {
-    val input1AccessExprs = for (i <- 0 until input1.getArity)
-      yield generateInputAccess(input1, input1Term, i, input1Mapping)
+    val input1AccessExprs = input1Mapping.map { idx =>
+      generateInputAccess(input1, input1Term, idx)
+    }
 
     val input2AccessExprs = input2 match {
-      case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i))
+      case Some(ti) =>
         // use generateFieldAccess instead of generateInputAccess to avoid the generated
table
         // function's field access code is put on the top of function body rather than
         // the while loop
-        yield generateFieldAccess(ti, input2Term, i, input2Mapping)
+        input2Mapping.map { idx =>
+          generateFieldAccess(ti, input2Term, idx)
+        }.toSeq
       case None => throw new CodeGenException("Type information of input2 must not be
null.")
     }
     (input1AccessExprs, input2AccessExprs)
@@ -1150,11 +1157,9 @@ class CodeGenerator(
   override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = {
     // if inputRef index is within size of input1 we work with input1, input2 otherwise
     val input = if (inputRef.getIndex < input1.getArity) {
-      (input1, input1Term, input1Mapping)
+      (input1, input1Term)
     } else {
-      (input2.getOrElse(throw new CodeGenException("Invalid input access.")),
-        input2Term,
-        input2Mapping)
+      (input2.getOrElse(throw new CodeGenException("Invalid input access.")), input2Term)
     }
 
     val index = if (input._2 == input1Term) {
@@ -1163,7 +1168,7 @@ class CodeGenerator(
       inputRef.getIndex - input1.getArity
     }
 
-    generateInputAccess(input._1, input._2, index, input._3)
+    generateInputAccess(input._1, input._2, index)
   }
 
   override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression = {
@@ -1172,8 +1177,7 @@ class CodeGenerator(
     val fieldAccessExpr = generateFieldAccess(
       refExpr.resultType,
       refExpr.resultTerm,
-      index,
-      input1Mapping)
+      index)
 
     val resultTerm = newName("result")
     val nullTerm = newName("isNull")
@@ -1581,8 +1585,7 @@ class CodeGenerator(
   private def generateInputAccess(
       inputType: TypeInformation[_ <: Any],
       inputTerm: String,
-      index: Int,
-      fieldMapping: Array[Int])
+      index: Int)
     : GeneratedExpression = {
     // if input has been used before, we can reuse the code that
     // has already been generated
@@ -1594,9 +1597,9 @@ class CodeGenerator(
       // generate input access and unboxing if necessary
       case None =>
         val expr = if (nullableInput) {
-          generateNullableInputFieldAccess(inputType, inputTerm, index, fieldMapping)
+          generateNullableInputFieldAccess(inputType, inputTerm, index)
         } else {
-          generateFieldAccess(inputType, inputTerm, index, fieldMapping)
+          generateFieldAccess(inputType, inputTerm, index)
         }
 
         reusableInputUnboxingExprs((inputTerm, index)) = expr
@@ -1609,27 +1612,19 @@ class CodeGenerator(
   private def generateNullableInputFieldAccess(
       inputType: TypeInformation[_ <: Any],
       inputTerm: String,
-      index: Int,
-      fieldMapping: Array[Int])
+      index: Int)
     : GeneratedExpression = {
     val resultTerm = newName("result")
     val nullTerm = newName("isNull")
 
     val fieldType = inputType match {
-      case ct: CompositeType[_] =>
-        val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
-          fieldMapping(index)
-        }
-        else {
-          index
-        }
-        ct.getTypeAt(fieldIndex)
+      case ct: CompositeType[_] => ct.getTypeAt(index)
       case at: AtomicType[_] => at
       case _ => throw new CodeGenException("Unsupported type for input field access.")
     }
     val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
     val defaultValue = primitiveDefaultValue(fieldType)
-    val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index, fieldMapping)
+    val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index)
 
     val inputCheckCode =
       s"""
@@ -1652,19 +1647,12 @@ class CodeGenerator(
   private def generateFieldAccess(
       inputType: TypeInformation[_],
       inputTerm: String,
-      index: Int,
-      fieldMapping: Array[Int])
+      index: Int)
     : GeneratedExpression = {
     inputType match {
       case ct: CompositeType[_] =>
-        val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
-          fieldMapping(index)
-        }
-        else {
-          index
-        }
-        val accessor = fieldAccessorFor(ct, fieldIndex)
-        val fieldType: TypeInformation[Any] = ct.getTypeAt(fieldIndex)
+        val accessor = fieldAccessorFor(ct, index)
+        val fieldType: TypeInformation[Any] = ct.getTypeAt(index)
         val fieldTypeTerm = boxedTypeTermForTypeInfo(fieldType)
 
         accessor match {

http://git-wip-us.apache.org/repos/asf/flink/blob/930216ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index e1ffb33..f67fbac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -469,13 +469,15 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers
{
 
   lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime
 
-  lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
-    case f ~ _ ~ _ => ProctimeAttribute(f)
-  }
+  lazy val proctime: PackratParser[Expression] =
+    (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ PROCTIME ^^
{
+      case f ~ _ ~ _ => ProctimeAttribute(f)
+    }
 
-  lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
-    case f ~ _ ~ _ => RowtimeAttribute(f)
-  }
+  lazy val rowtime: PackratParser[Expression] =
+    (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ ROWTIME ^^
{
+      case f ~ _ ~ _ => RowtimeAttribute(f)
+    }
 
   // alias
 
@@ -485,6 +487,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers
{
     case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
   } | logic
 
+  lazy val aliasMapping: PackratParser[Expression] = fieldReference ~ AS ~ fieldReference
^^ {
+      case e ~ _ ~ name => Alias(e, name.name)
+  }
+
   lazy val expression: PackratParser[Expression] = timeIndicator | overConstant | alias |
     failure("Invalid expression.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/930216ef/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
index 3048835..69b7890 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 import java.sql.Timestamp;
 
 /**
- * POJOs for table api testing.
+ * POJOs for Table API testing.
  */
 public class Pojos {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/930216ef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index 73cb701..c434f47 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -31,8 +31,8 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
 import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
-import org.apache.flink.table.expressions.TimeIntervalUnit
-import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark
+import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
+import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark,
TimestampWithEqualWatermarkPojo}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -337,6 +337,67 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase
{
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testPojoSupport(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val p1 = new TestPojo
+    p1.a = 12
+    p1.b = 42L
+    p1.c = "Test me."
+
+    val p2 = new TestPojo
+    p2.a = 13
+    p2.b = 43L
+    p2.c = "And me."
+
+    val stream = env
+      .fromElements(p1, p2)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermarkPojo)
+    // use aliases, swap all attributes, and skip b2
+    val table = stream.toTable(tEnv, ('b as 'b).rowtime, 'c as 'c, 'a as 'a)
+    // no aliases, no swapping
+    val table2 = stream.toTable(tEnv, 'a, 'b.rowtime, 'c)
+    // use proctime, no skipping
+    val table3 = stream.toTable(tEnv, 'a, 'b.rowtime, 'c, 'b2, 'proctime.proctime)
+
+    // Java expressions
+
+    // use aliases, swap all attributes, and skip b2
+    val table4 = stream.toTable(
+      tEnv,
+      ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as a"): _*)
+    // no aliases, no swapping
+    val table5 = stream.toTable(
+      tEnv,
+      ExpressionParser.parseExpressionList("a, b.rowtime, c"): _*)
+
+    val t = table.select('b, 'c , 'a)
+      .unionAll(table2.select('b, 'c, 'a))
+      .unionAll(table3.select('b, 'c, 'a))
+      .unionAll(table4.select('b, 'c, 'a))
+      .unionAll(table5.select('b, 'c, 'a))
+
+    val results = t.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.042,Test me.,12",
+      "1970-01-01 00:00:00.042,Test me.,12",
+      "1970-01-01 00:00:00.042,Test me.,12",
+      "1970-01-01 00:00:00.042,Test me.,12",
+      "1970-01-01 00:00:00.042,Test me.,12",
+      "1970-01-01 00:00:00.043,And me.,13",
+      "1970-01-01 00:00:00.043,And me.,13",
+      "1970-01-01 00:00:00.043,And me.,13",
+      "1970-01-01 00:00:00.043,And me.,13",
+      "1970-01-01 00:00:00.043,And me.,13")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object TimeAttributesITCase {
@@ -356,4 +417,28 @@ object TimeAttributesITCase {
       element._1
     }
   }
+
+  class TimestampWithEqualWatermarkPojo
+  extends AssignerWithPunctuatedWatermarks[TestPojo] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: TestPojo,
+        extractedTimestamp: Long)
+      : Watermark = {
+      new Watermark(extractedTimestamp)
+    }
+
+    override def extractTimestamp(
+        element: TestPojo,
+        previousElementTimestamp: Long): Long = {
+      element.b
+    }
+  }
+
+  class TestPojo() {
+    var a: Int = _
+    var b: Long = _
+    var b2: String = "skip me"
+    var c: String = _
+  }
 }


Mime
View raw message