flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
Date Mon, 28 Nov 2016 10:17:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15701534#comment-15701534

ASF GitHub Bot commented on FLINK-4469:

Github user wuchong commented on the issue:

    Hi @fhueske @twalthr , I have addressed all the comments and made the following changes:
    1. Forbid TableFunction implemented by Scala object, since the `collect` is called on
a singleton. It will be error-prone in some concurrent cases.
    2. Check correct errors if a SQL query refers to a function in FROM that has not been
registered or which is a ScalarFunction.
    3. Make `TableFunction` and `ScalarFunction` clean, **remove `UserDefinedFunction`**,
and move eval relative functions and `createSqlFunction` to utils.
    4. Rename `ScalarFunctions` to `SqlFunctions` because contains also TableFunction logic.
    5. Restructure tests. Test Java Table API by comparing the RelNode of two tables. And
check SQL API's DataSetRel and DataStreamRel via `TableTestBase` utils. The tests reduced
into `stream/UserDefinedTableFunctionITCase`, `batch/UserDefinedTableFunctionITCase`, and
`stream/UserDefinedTableFunctionTest` , `batch/UserDefinedTableFunctionTest`.
    6. Scala Table API implicitly convert `TableFunction` into `TableFunctionCall`. `TableFunctionCall`
is not an `Expression` or `LogicalNode`, but is like `GroupWindow`, can be visible to the
users (contains API such `as(...)`).
    7. Fix the hierarchy type extraction problem.
    8. Rebase the code and fix conflicts.
    9. ...

> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>                 Key: FLINK-4469
>                 URL: https://issues.apache.org/jira/browse/FLINK-4469
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
> Normal user-defined functions, such as concat(), take in a single input row and output
a single output row. In contrast, table-generating functions transform a single input row
to multiple output rows. It is very useful in some cases, such as look up in HBase by rowkey
and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of Java type
erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method to call
according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
>     public Iterable<Word> eval(String str) {
>         if (str != null) {
>             for (String s : str.split(",")) {
>                 collect(new Word(s, s.length()));
>             }
>         }
>     }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")	
>      .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>      .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>      .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>      .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#

This message was sent by Atlassian JIRA

View raw message