flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Date Mon, 18 Sep 2017 20:52:12 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3829#discussion_r139471593
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -71,15 +85,29 @@ val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
     
     // SQL query with an inlined (unregistered) table
     val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
    -val result = tableEnv.sql(
    +val result = tableEnv.sqlQuery(
       s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
     
     // SQL query with a registered table
     // register the DataStream under the name "Orders"
     tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
     // run a SQL query on the Table and retrieve the result as a new Table
    -val result2 = tableEnv.sql(
    +val result2 = tableEnv.sqlQuery(
       "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
    +
    +// SQL update with a registered table
    +// register the DataStream as table "Orders"
    +tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
    +// create a TableSink
    +TableSink csvSink = new CsvTableSink("/path/to/file", ...)
    +// define the field names and types
    +val fieldNames: Arary[String] = Array("id", "product", "amount")
    --- End diff --
    
    fix schema of result (must match query result schema)


---

Mime
View raw message