flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
Date Wed, 19 Jul 2017 13:06:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16093036#comment-16093036

ASF GitHub Bot commented on FLINK-6442:

Github user lincoln-lil commented on the issue:

    @fhueske  Agree with you, we should maintain the consistency of the API.
    For TableSink, register its schema before using it sounds reasonable. My concern is this
is a breaking change of the API and the new behavior will affect user's existing code. 
    On the other hand,  in RDBMS we can use the 'SQL CREATE TABLE AS statement' to create
a table from an existing table by copying the existing table's columns. When creating a table
in this way, the new table will be populated with the records from the existing table (Based
on the SELECT Statement). 
    This is a common operation, I think we need to support this functionality. 
    Based on these considerations, I propose to retain the current 'writeToSink' method, keep
the current configure behavior to support the derived sink schema.  Add a new 'insertInto'
method to support the pre-defined schema, and will do type validation within it(for now only
support dml insert in SQL, later we can add support for 'create table as statement', so that
TableAPI and SQL's semantics are exactly same)
    I agree distinguishing the insert and select query method, but I'm concerned about the
method name itself, 'sql' covers all types of query, not limited to select.
    Standard sql dml includes the select / insert / update / delete, so if we need to distinguish
the sub-type of query, I suggest the method named select / insert (or differs from the 'select'
method name in 'table.scala', named dmlSelect / dmlInsert), then do the corresponding check
in each method. The insert method's return type can be declared as Unit rather than null.
    What do you think?
    Thanks, Lincoln

> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
> -------------------------------------------------------------------------------
>                 Key: FLINK-6442
>                 URL: https://issues.apache.org/jira/browse/FLINK-6442
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: lincoln.lee
>            Assignee: lincoln.lee
>            Priority: Minor
> Currently in TableAPI  there’s only registration method for source table,  when we
use SQL writing a streaming job, we should add additional part for the sink, like TableAPI
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 'schema'
is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target output could
be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
>     tEnv.registerTableSink("targetTable", new YourSink)
>     val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
>     val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: https://goo.gl/n3phK5

This message was sent by Atlassian JIRA

View raw message