flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible
Date Thu, 04 Jan 2018 23:26:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312197#comment-16312197
] 

ASF GitHub Bot commented on FLINK-8203:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159779243
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
---
    @@ -18,231 +18,574 @@
     
     package org.apache.flink.table.api
     
    -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
     import org.apache.flink.api.common.typeinfo.TypeInformation
    -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo,
TypeExtractor}
     import org.apache.flink.api.scala._
    -import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
    -import org.apache.flink.table.runtime.types.CRowTypeInfo
    -import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
    +import org.apache.flink.table.api.TableEnvironmentTest._
    +import org.apache.flink.table.api.Types._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATOR =>
PROCTIME}
    +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{ROWTIME_INDICATOR =>
ROWTIME}
    +import org.apache.flink.table.utils.TableTestBase
     import org.apache.flink.types.Row
    -import org.junit.Assert.assertEquals
    +import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo
    +import org.apache.flink.api.scala.typeutils.UnitTypeInfo
     import org.junit.Test
     
     class TableEnvironmentTest extends TableTestBase {
     
    -  val tEnv = new MockTableEnvironment
    -
    -  val tupleType = new TupleTypeInfo(
    -    INT_TYPE_INFO,
    -    STRING_TYPE_INFO,
    -    DOUBLE_TYPE_INFO)
    -
    -  val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
    -
    -  val cRowType = new CRowTypeInfo(rowType)
    -
    -  val caseClassType: TypeInformation[CClass] = implicitly[TypeInformation[CClass]]
    -
    -  val pojoType: TypeInformation[PojoClass] = TypeExtractor.createTypeInfo(classOf[PojoClass])
    -
    -  val atomicType = INT_TYPE_INFO
    -
    -  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
    +  // ----------------------------------------------------------------------------------------------
    +  // schema definition by position
    +  // ----------------------------------------------------------------------------------------------
     
       @Test
    -  def testGetFieldInfoRow(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(rowType)
    -
    -    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testProjectByPosition(): Unit = {
    +    val utils = Seq(streamTestUtil(), batchTestUtil())
    +
    +    utils.foreach { util =>
    +
    +      // case class
    +      util.verifySchema(
    +        util.addTable[CClass]('a, 'b, 'c),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('a, 'b),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('a),
    +        Seq("a" -> INT))
    +
    +      // row
    +      util.verifySchema(
    +        util.addTable('a, 'b, 'c)(TEST_ROW),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable('a, 'b)(TEST_ROW),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable('a)(TEST_ROW),
    +        Seq("a" -> INT))
    +
    +      // tuple
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a, 'b),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a),
    +        Seq("a" -> INT))
    +    }
       }
     
       @Test
    -  def testGetFieldInfoRowNames(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(
    -      rowType,
    -      Array(
    -        UnresolvedFieldReference("name1"),
    -        UnresolvedFieldReference("name2"),
    -        UnresolvedFieldReference("name3")
    -      ))
    -
    -    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2,
x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamProjectWithAddingTimeAttributesByPosition(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c , 'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
    +
    +    // row
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'proctime.proctime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'rowtime.rowtime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
    +
    +    // tuple
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'rowtime.rowtime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
       }
     
       @Test
    -  def testGetFieldInfoTuple(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(tupleType)
    -
    -    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamAliasWithReplacingTimeAttributesByPosition(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
    +      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
    +
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('a, ('b as 'new).rowtime, 'c),
    +      Seq("a" -> INT, "new" -> ROWTIME, "c" -> STRING))
    +
    +    // row
    +    util.verifySchema(
    +      util.addTable('a, 'b.rowtime, 'c)(TEST_ROW_WITH_TIME),
    +      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
    +
    +    util.verifySchema(
    +      util.addTable('a, ('b as 'new).rowtime, 'c)(TEST_ROW_WITH_TIME),
    +      Seq("a" -> INT, "new" -> ROWTIME, "c" -> STRING))
    +
    +    // tuple
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, Long, String]]('a, ('b as 'new).rowtime, 'c),
    +      Seq("a" -> INT, "new" -> ROWTIME, "c" -> STRING))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, Long, String]]('a, ('b as 'new).rowtime, 'c),
    +      Seq("a" -> INT, "new" -> ROWTIME, "c" -> STRING))
       }
     
    -  @Test
    -  def testGetFieldInfoCClass(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(caseClassType)
    +  // ----------------------------------------------------------------------------------------------
    +  // schema definition by name
    +  // ----------------------------------------------------------------------------------------------
     
    -    fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  @Test
    +  def testProjectByName(): Unit = {
    +    val utils = Seq(streamTestUtil(), batchTestUtil())
    +
    +    utils.foreach { util =>
    +
    +      // atomic
    +      util.verifySchema(
    +        util.addTable[Int](),
    +        Seq("f0" -> INT))
    +
    +      util.verifySchema(
    +        util.addTable[Int]('myint),
    +        Seq("myint" -> INT))
    +
    +      // case class
    +      util.verifySchema(
    +        util.addTable[CClass](),
    +        Seq("cf1" -> INT, "cf2" -> STRING, "cf3" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('cf1, 'cf2),
    +        Seq("cf1" -> INT, "cf2" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('cf1, 'cf3),
    +        Seq("cf1" -> INT, "cf3" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('cf3, 'cf1),
    +        Seq("cf3" -> DOUBLE, "cf1" -> INT))
    +
    +      // row
    +      util.verifySchema(
    +        util.addTable()(TEST_ROW),
    +        Seq("rf1" -> INT, "rf2" -> STRING, "rf3" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable('rf1, 'rf2)(TEST_ROW),
    +        Seq("rf1" -> INT, "rf2" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable('rf1, 'rf3)(TEST_ROW),
    +        Seq("rf1" -> INT, "rf3" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable('rf3, 'rf1)(TEST_ROW),
    +        Seq("rf3" -> DOUBLE, "rf1" -> INT))
    +
    +      // tuple
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]](),
    +        Seq("f0" -> INT, "f1" -> STRING, "f2" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('f0, 'f1),
    +        Seq("f0" -> INT, "f1" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('f0, 'f2),
    +        Seq("f0" -> INT, "f2" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('f2, 'f0),
    +        Seq("f2" -> DOUBLE, "f0" -> INT))
    +
    +      // pojo
    +      util.verifySchema(
    +        util.addTable[PojoClass](),
    +        Seq("pf1" -> INT, "pf2" -> STRING, "pf3" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[PojoClass]('pf1, 'pf2),
    +        Seq("pf1" -> INT, "pf2" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[PojoClass]('pf1, 'pf3),
    +        Seq("pf1" -> INT, "pf3" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[PojoClass]('pf3, 'pf1),
    +        Seq("pf3" -> DOUBLE, "pf1" -> INT))
    +
    +      // generic
    +      util.verifySchema(
    +        util.addTable[Class[_]]('mygeneric),
    +        Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
    +
    +      util.verifySchema(
    +        util.addTable[Class[_]](),
    +        Seq("f0" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
    +
    +      // any type info
    +      util.verifySchema(
    +        util.addTable[Unit](),
    +        Seq("f0" -> new UnitTypeInfo()))
    +
    +      util.verifySchema(
    +        util.addTable[Unit]('unit),
    +        Seq("unit" -> new UnitTypeInfo()))
    +    }
       }
     
       @Test
    -  def testGetFieldInfoPojo(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(pojoType)
    -
    -    fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamProjectWithAddingTimeAttributesByName(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // atomic
    +    util.verifySchema(
    +      util.addTable[Int]('proctime.proctime, 'myint),
    +      Seq("proctime" -> PROCTIME, "myint" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable[Int]('rowtime.rowtime, 'myint),
    +      Seq("rowtime" -> ROWTIME, "myint" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable[Int]('myint, 'proctime.proctime),
    +      Seq("myint" -> INT, "proctime" -> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[Int]('myint, 'rowtime.rowtime),
    +      Seq("myint" -> INT, "rowtime" -> ROWTIME))
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClass]('proctime.proctime, 'cf1, 'cf3),
    +      Seq("proctime" -> PROCTIME, "cf1" -> INT, "cf3" -> DOUBLE))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('rowtime.rowtime, 'cf3, 'cf1),
    +      Seq("rowtime" -> ROWTIME, "cf3" -> DOUBLE, "cf1" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('cf1, 'proctime.proctime, 'cf3),
    +      Seq("cf1" -> INT, "proctime" -> PROCTIME, "cf3" -> DOUBLE))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('cf3, 'rowtime.rowtime, 'cf1),
    +      Seq("cf3" -> DOUBLE, "rowtime" -> ROWTIME, "cf1" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('cf1, 'cf3, 'proctime.proctime),
    +      Seq("cf1" -> INT, "cf3" -> DOUBLE, "proctime" -> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('cf3, 'cf1, 'rowtime.rowtime),
    +      Seq("cf3" -> DOUBLE, "cf1" -> INT, "rowtime" -> ROWTIME))
    +
    +    // row
    +    util.verifySchema(
    +      util.addTable('proctime.proctime, 'rf1, 'rf3)(TEST_ROW),
    +      Seq("proctime" -> PROCTIME, "rf1" -> INT, "rf3" -> DOUBLE))
    +
    +    util.verifySchema(
    +      util.addTable('rowtime.rowtime, 'rf3, 'rf1)(TEST_ROW),
    +      Seq("rowtime" -> ROWTIME, "rf3" -> DOUBLE, "rf1" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable('rf3, 'proctime.proctime, 'rf1)(TEST_ROW),
    +      Seq("rf3" -> DOUBLE, "proctime" -> PROCTIME, "rf1" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable('rf3, 'rowtime.rowtime, 'rf1)(TEST_ROW),
    +      Seq("rf3" -> DOUBLE, "rowtime" -> ROWTIME, "rf1" -> INT))
    +
    +        util.verifySchema(
    +      util.addTable('rf3, 'rf1, 'proctime.proctime)(TEST_ROW),
    +      Seq("rf3" -> DOUBLE, "rf1" -> INT, "proctime" -> PROCTIME))
    +
    +        util.verifySchema(
    +      util.addTable('rf3, 'rf1, 'rowtime.rowtime)(TEST_ROW),
    +      Seq("rf3" -> DOUBLE, "rf1" -> INT, "rowtime" -> ROWTIME))
    +
    +    // tuple
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('proctime.proctime, 'f0, 'f2),
    +      Seq("proctime" -> PROCTIME, "f0" -> INT, "f2" -> DOUBLE))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('rowtime.rowtime, 'f2, 'f0),
    +      Seq("rowtime" -> ROWTIME, "f2" -> DOUBLE, "f0" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('f0, 'proctime.proctime, 'f2),
    +      Seq("f0" -> INT, "proctime" -> PROCTIME, "f2" -> DOUBLE))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('f2, 'rowtime.rowtime, 'f0),
    +      Seq("f2" -> DOUBLE, "rowtime" -> ROWTIME, "f0" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('f0, 'f2, 'proctime.proctime),
    +      Seq("f0" -> INT, "f2" -> DOUBLE, "proctime" -> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('f2, 'f0, 'rowtime.rowtime),
    +      Seq("f2" -> DOUBLE, "f0" -> INT, "rowtime" -> ROWTIME))
    +
    +    // pojo
    +    util.verifySchema(
    +      util.addTable[PojoClass]('proctime.proctime, 'pf1, 'pf3),
    +      Seq("proctime" -> PROCTIME, "pf1" -> INT, "pf3" -> DOUBLE))
    +
    +    util.verifySchema(
    +      util.addTable[PojoClass]('rowtime.rowtime, 'pf3, 'pf1),
    +      Seq("rowtime" -> ROWTIME, "pf3" -> DOUBLE, "pf1" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable[PojoClass]('pf1, 'proctime.proctime, 'pf3),
    +      Seq("pf1" -> INT, "proctime" -> PROCTIME, "pf3" -> DOUBLE))
    +
    +    util.verifySchema(
    +      util.addTable[PojoClass]('pf3, 'rowtime.rowtime, 'pf1),
    +      Seq("pf3" -> DOUBLE, "rowtime" -> ROWTIME, "pf1" -> INT))
    +
    +    util.verifySchema(
    +      util.addTable[PojoClass]('pf1, 'pf3, 'proctime.proctime),
    +      Seq("pf1" -> INT, "pf3" -> DOUBLE, "proctime" -> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[PojoClass]('pf3, 'pf1, 'rowtime.rowtime),
    +      Seq("pf3" -> DOUBLE, "pf1" -> INT, "rowtime" -> ROWTIME))
    +
    +    // generic
    +    util.verifySchema(
    +      util.addTable[Class[_]]('proctime.proctime, 'mygeneric),
    +      Seq("proctime" -> PROCTIME, "mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
    +
    +    util.verifySchema(
    +      util.addTable[Class[_]]('rowtime.rowtime, 'mygeneric),
    +      Seq("rowtime" -> ROWTIME, "mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
    +
    +    util.verifySchema(
    +      util.addTable[Class[_]]('mygeneric, 'proctime.proctime),
    +      Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]]), "proctime"
-> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[Class[_]]('mygeneric, 'rowtime.rowtime),
    +      Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]]), "rowtime"
-> ROWTIME))
    +
    +    // any type info
    +    util.verifySchema(
    +      util.addTable[Unit]('proctime.proctime, 'unit),
    +      Seq("proctime" -> PROCTIME, "unit" -> new UnitTypeInfo()))
    +
    +    util.verifySchema(
    +      util.addTable[Unit]('rowtime.rowtime, 'unit),
    +      Seq("rowtime" -> ROWTIME, "unit" -> new UnitTypeInfo()))
    +
    +    util.verifySchema(
    +      util.addTable[Unit]('unit, 'proctime.proctime),
    +      Seq("unit" -> new UnitTypeInfo(), "proctime" -> PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[Unit]('unit, 'rowtime.rowtime),
    +      Seq("unit" -> new UnitTypeInfo(), "rowtime" -> ROWTIME))
       }
     
       @Test
    -  def testGetFieldInfoAtomic(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(atomicType)
    -
    -    fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamProjectWithReplacingTimeAttributesByName(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // atomic
    +    util.verifySchema(
    +      util.addTable[Int]('myint.proctime),
    +      Seq("myint" -> PROCTIME))
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('cf1, 'cf3.proctime, 'cf2),
    +      Seq("cf1" -> INT, "cf3" -> PROCTIME, "cf2" -> LONG))
    +
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('cf1, 'cf3.rowtime, 'cf2),
    +      Seq("cf1" -> INT, "cf3" -> ROWTIME, "cf2" -> LONG))
    +
    +    // row
    +    util.verifySchema(
    +      util.addTable('rf1, 'rf3.proctime, 'rf2)(TEST_ROW_WITH_TIME),
    +      Seq("rf1" -> INT, "rf3" -> PROCTIME, "rf2" -> LONG))
    +
    +    util.verifySchema(
    +      util.addTable('rf1, 'rf3.rowtime, 'rf2)(TEST_ROW_WITH_TIME),
    +      Seq("rf1" -> INT, "rf3" -> ROWTIME, "rf2" -> LONG))
    +
    +    // tuple
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, Long, String]]('f0, 'f2.proctime, 'f1),
    +      Seq("f0" -> INT, "f2" -> PROCTIME, "f1" -> LONG))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, Long, String]]('f0, 'f2.rowtime, 'f1),
    +      Seq("f0" -> INT, "f2" -> ROWTIME, "f1" -> LONG))
       }
     
       @Test
    -  def testGetFieldInfoTupleNames(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(
    -      tupleType,
    -      Array(
    -        UnresolvedFieldReference("name1"),
    -        UnresolvedFieldReference("name2"),
    -        UnresolvedFieldReference("name3")
    -      ))
    -
    -    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2,
x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testAliasByName(): Unit = {
    +    val utils = Seq(streamTestUtil(), batchTestUtil())
    +
    +    utils.foreach { util =>
    +
    +      // atomic
    +      util.verifySchema(
    +        util.addTable[Int]('myint as 'new),
    +        Seq("new" -> INT))
    +
    +      // case class
    +      util.verifySchema(
    +        util.addTable[CClass]('cf1, 'cf3 as 'new, 'cf2),
    +        Seq("cf1" -> INT, "new" -> DOUBLE, "cf2" -> STRING))
    +
    +      // row
    +      util.verifySchema(
    +        util.addTable('rf1, 'rf3 as 'new, 'rf2)(TEST_ROW),
    +        Seq("rf1" -> INT, "new" -> DOUBLE, "rf2" -> STRING))
    +
    +      // tuple
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('f0, 'f2 as 'new, 'f1),
    +        Seq("f0" -> INT, "new" -> DOUBLE, "f1" -> STRING))
    +    }
       }
     
       @Test
    -  def testGetFieldInfoCClassNames(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(
    -      caseClassType,
    -      Array(
    -        UnresolvedFieldReference("name1"),
    -        UnresolvedFieldReference("name2"),
    -        UnresolvedFieldReference("name3")
    -      ))
    -
    -    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2,
x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamAliasWithAddingTimeAttributesByName(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // atomic
    +    util.verifySchema(
    +      util.addTable[Int](('myint as 'new).proctime),
    +      Seq("new" -> PROCTIME))
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('cf1, ('newnew as 'new).proctime, 'cf2),
    +      Seq("cf1" -> INT, "new" -> PROCTIME, "cf2" -> LONG))
    +
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('cf1, ('newnew as 'new).rowtime, 'cf2),
    +      Seq("cf1" -> INT, "new" -> ROWTIME, "cf2" -> LONG))
    +
    +    // row
    +    util.verifySchema(
    +      util.addTable('rf1, ('newnew as 'new).proctime, 'rf2)(TEST_ROW_WITH_TIME),
    +      Seq("rf1" -> INT, "new" -> PROCTIME, "rf2" -> LONG))
    +
    +    util.verifySchema(
    +      util.addTable('rf1, ('newnew as 'new).rowtime, 'rf2)(TEST_ROW_WITH_TIME),
    +      Seq("rf1" -> INT, "new" -> ROWTIME, "rf2" -> LONG))
    +
    +    // tuple
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, Long, String]]('f0, ('newnew as 'new).proctime, 'f1),
    +      Seq("f0" -> INT, "new" -> PROCTIME, "f1" -> LONG))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, Long, String]]('f0, ('newnew as 'new).rowtime, 'f1),
    +      Seq("f0" -> INT, "new" -> ROWTIME, "f1" -> LONG))
       }
     
       @Test
    -  def testGetFieldInfoPojoNames2(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(
    -      pojoType,
    -      Array(
    -        UnresolvedFieldReference("pf3"),
    -        UnresolvedFieldReference("pf1"),
    -        UnresolvedFieldReference("pf2")
    -      ))
    -
    -    fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamAliasWithReplacingTimeAttributesByName(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // atomic
    +    util.verifySchema(
    +      util.addTable[Int](('myint as 'new).proctime),
    +      Seq("new" -> PROCTIME))
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('cf1, ('cf3 as 'new).proctime, 'cf2),
    --- End diff --
    
    Should we allow to define an existing field as a `proctime` attribute?
      


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-8203
>                 URL: https://issues.apache.org/jira/browse/FLINK-8203
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Fabian Hueske
>            Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, the schema
of the table can be defined (by default it is extracted from the {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename fields,
or define time attributes. Right now, there are several limitations how the fields can be
defined that also depend on the type of the {{DataStream}} / {{DataSet}}. Types with explicit
field ordering (e.g., tuples, case classes, Row) require schema definition based on the position
of fields. Pojo types which have no fixed order of fields, require to refer to fields by name.
Moreover, there are several restrictions on how time attributes can be defined, e.g., event
time attribute must replace an existing field or be appended and proctime attributes must
be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are referenced
by name (and possibly renamed using an alias ({{as}}). In this mode, fields can be reordered
and projected out. Moreover, we can define proctime and eventtime attributes at arbitrary
positions using arbitrary names (except those that existing the result schema). This mode
can be used for any input type, including POJOs. This mode is used if all field references
exist in the input type.
> 2. Reference input fields by position: Field references might not refer to existing fields
in the input type. In this mode, fields are simply renamed. Event-time attributes can replace
the field on their position in the input data (if it is of correct type) or be appended at
the end. Proctime attributes must be appended at the end. This mode can only be used if the
input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and schema definition
modes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message