flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6334) Refactoring UDTF interface
Date Mon, 01 May 2017 21:14:05 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991546#comment-15991546
] 

ASF GitHub Bot commented on FLINK-6334:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114188108
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
---
    @@ -50,72 +50,85 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     
         // Java environment
         val javaEnv = mock(classOf[JavaExecutionEnv])
    -    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
    -    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
    +    val jtEnv = TableEnvironment.getTableEnvironment(javaEnv)
    +    val in2 = jtEnv.fromDataStream(jDs).as("a, b, c")
     
         // test cross join
         val func1 = new TableFunc1
    -    javaTableEnv.registerFunction("func1", func1)
    +    jtEnv.registerFunction("func1", func1)
         var scalaTable = in1.join(func1('c) as 's).select('c, 's)
    -    var javaTable = in2.join("func1(c).as(s)").select("c, s")
    +    var javaTable = in2.join(jtEnv.tableApply("func1(c)")
    +      .as("s")).select("c, s")
         verifyTableEquals(scalaTable, javaTable)
     
         // test left outer join
         scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
    -    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
    +    javaTable = in2.leftOuterJoin(
    +      jtEnv.tableApply("func1(c)")
    +        .as("s")
    +    ).select("c, s")
         verifyTableEquals(scalaTable, javaTable)
     
         // test overloading
         scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
    -    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
    +    javaTable = in2.join(jtEnv.tableApply("func1(c, '$')")
    +      .as("s")).select("c, s")
         verifyTableEquals(scalaTable, javaTable)
     
         // test custom result type
         val func2 = new TableFunc2
    -    javaTableEnv.registerFunction("func2", func2)
    +    jtEnv.registerFunction("func2", func2)
         scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
    -    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
    +    javaTable = in2.join(
    +      jtEnv.tableApply("func2(c)")
    +        .as("name, len"))
    +      .select("c, name, len")
         verifyTableEquals(scalaTable, javaTable)
     
         // test hierarchy generic type
         val hierarchy = new HierarchyTableFunction
    -    javaTableEnv.registerFunction("hierarchy", hierarchy)
    -    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
    +    jtEnv.registerFunction("hierarchy", hierarchy)
    +    scalaTable = in1.join(hierarchy('c) as ('name, 'len, 'adult))
           .select('c, 'name, 'len, 'adult)
    -    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
    +    javaTable = in2.join(jtEnv.tableApply("hierarchy(c)")
    +      .as("name, len, adult"))
           .select("c, name, len, adult")
         verifyTableEquals(scalaTable, javaTable)
     
         // test pojo type
         val pojo = new PojoTableFunc
    -    javaTableEnv.registerFunction("pojo", pojo)
    +    jtEnv.registerFunction("pojo", pojo)
         scalaTable = in1.join(pojo('c))
           .select('c, 'name, 'age)
    -    javaTable = in2.join("pojo(c)")
    +    javaTable = in2.join(jtEnv.tableApply("pojo(c)"))
           .select("c, name, age")
         verifyTableEquals(scalaTable, javaTable)
     
         // test with filter
         scalaTable = in1.join(func2('c) as ('name, 'len))
           .select('c, 'name, 'len).filter('len > 2)
    -    javaTable = in2.join("func2(c) as (name, len)")
    +    javaTable = in2.join(jtEnv.tableApply("func2(c)") as ("name, len"))
           .select("c, name, len").filter("len > 2")
         verifyTableEquals(scalaTable, javaTable)
     
         // test with scalar function
         scalaTable = in1.join(func1('c.substring(2)) as 's)
           .select('a, 'c, 's)
    -    javaTable = in2.join("func1(substring(c, 2)) as (s)")
    +    javaTable = in2.join(jtEnv.tableApply("func1(substring(c, 2))") as ("s"))
           .select("a, c, s")
         verifyTableEquals(scalaTable, javaTable)
     
         // check scala object is forbidden
         expectExceptionThrown(
           tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
         expectExceptionThrown(
    -      javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
    +      jtEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
         expectExceptionThrown(
    -      in1.join(ObjectTableFunction('a, 1)), "Scala object")
    +      {
    --- End diff --
    
    Please avoid unnecessary refactorings. They make PRs harder to review and might be reverted
by the next person going over this code.


> Refactoring UDTF interface
> --------------------------
>
>                 Key: FLINK-6334
>                 URL: https://issues.apache.org/jira/browse/FLINK-6334
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Ruidong Li
>            Assignee: Ruidong Li
>
> The current UDTF leverages the table.join(expression) interface, which is not a proper
interface in terms of semantics. We would like to refactor this to let UDTF use table.join(table)
interface. Very briefly,  UDTF's apply method will return a Table Type, so Join(UDTF('a, 'b,
...) as 'c) shall be viewed as join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message