Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A4B65200C86 for ; Wed, 31 May 2017 17:08:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A329B160BBA; Wed, 31 May 2017 15:08:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7DBD1160BDB for ; Wed, 31 May 2017 17:08:14 +0200 (CEST) Received: (qmail 99795 invoked by uid 500); 31 May 2017 15:08:13 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 99785 invoked by uid 99); 31 May 2017 15:08:13 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 May 2017 15:08:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 2A97E180370 for ; Wed, 31 May 2017 15:08:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id saJKfogKUh-Y for ; Wed, 31 May 2017 15:08:09 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 06D036105A for ; Wed, 31 May 2017 15:08:08 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 6695EE0DD2 for ; Wed, 31 May 2017 15:08:07 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id D0E5E23FD6 for ; Wed, 31 May 2017 15:08:06 +0000 (UTC) Date: Wed, 31 May 2017 15:08:06 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-6746) Table API / SQL Docs: Common Page MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 31 May 2017 15:08:15 -0000 [ https://issues.apache.org/jira/browse/FLINK-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16031299#comment-16031299 ] ASF GitHub Bot commented on FLINK-6746: --------------------------------------- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4012#discussion_r119369993 --- Diff: docs/dev/table/common.md --- @@ -98,374 +89,767 @@ env.execute("Your Query") +**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look a the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa. + {% top %} Create a TableEnvironment ------------------------- -A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine Tables of different TableEnvironments. +The `TableEnvironment` is a central concept of the Table API and SQL integration. It is responsible for: +* Registering a `Table` in the internal catalog +* Registering an external catalog +* Executing SQL queries +* Registering a user-defined (scalar, table, or aggregation) function +* Converting a `DataStream` or `DataSet` into a `Table` +* Holding a reference to an `ExecutionEnvironment` or `StreamExecutionEnvironment` + +A `Table` is always bound to a specific `TableEnvironment`. It is not process tables of different TableEnvironments in the same query, e.g., to join or union them. + +A `TableEnvironment` is created by calling the static `TableEnvironment.getTableEnvironment()` method with a `StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional `TableConfig`. The `TableConfig` can be used to configure the `TableEnvironment` or to customize the query optimization and translation process (see [Query Optimization](#query-optimization)). -**TODO: Extend** +
+
+{% highlight java %} +// *************** +// STREAMING QUERY +// *************** +StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); +// Create a TableEnvironment for streaming queries +StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); + +// *********** +// BATCH QUERY +// *********** +ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); +// Create a TableEnvironment for batch queries +BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv); +{% endhighlight %} +
+ +
+{% highlight scala %} +// *************** +// STREAMING QUERY +// *************** +val sEnv = StreamExecutionEnvironment.getExecutionEnvironment +// Create a TableEnvironment for streaming queries +val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) + +// *********** +// BATCH QUERY +// *********** +val bEnv = ExecutionEnvironment.getExecutionEnvironment +// Create a TableEnvironment for batch queries +val bTableEnv = TableEnvironment.getTableEnvironment(bEnv) +{% endhighlight %} +
+
{% top %} Register a Table in the Catalog ------------------------------- -`TableEnvironment`s have an internal table catalog to which tables can be registered with a unique name. After registration, a table can be accessed from the `TableEnvironment` by its name. +A `TableEnvironment` has an internal catalog to register tables by name. Table API or SQL queries can access tables, which are registered in the catalog by referencing them with their name. -*Note: `DataSet`s or `DataStream`s can be directly converted into `Table`s without registering them in the `TableEnvironment`. See [Create a Table from a DataStream or DataSet](#tbd) for details. +A `TableEnvironment` allows to register a table from various sources: +* an existing `Table` object, usually the result of a Table API or SQL query. +* a `TableSource`, which accesses external data, such as a file, database, or messaging system. +* a `DataStream` or `DataSet` from a DataStream or DataSet program. + +Registering a `DataStream` or `DataSet` as a table is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section. ### Register a Table -A `Table` that originates from a Table API operation or a SQL query is registered in a `TableEnvironment` as follows: +A `Table` is registered in a `TableEnvironment` as follows:
{% highlight java %} -// works for StreamExecutionEnvironment identically -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); +// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); -// convert a DataSet into a Table -Table custT = tableEnv - .toTable(custDs, "name, zipcode") - .where("zipcode = '12345'") - .select("name"); +// Table is the result of a simple projection query +Table projX = tableEnv.scan("X").project(...); -// register the Table custT as table "custNames" -tableEnv.registerTable("custNames", custT); +// register the Table projX as table "projectedX" +tableEnv.registerTable("projectedX", projX); {% endhighlight %}
{% highlight scala %} -// works for StreamExecutionEnvironment identically -val env = ExecutionEnvironment.getExecutionEnvironment +// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) -// convert a DataSet into a Table -val custT = custDs - .toTable(tableEnv, 'name, 'zipcode) - .where('zipcode === "12345") - .select('name) +// Table is the result of a simple projection query +val projX: Table = tableEnv.scan("X").project(...) -// register the Table custT as table "custNames" -tableEnv.registerTable("custNames", custT) +// register the Table projX as table "projectedX" +tableEnv.registerTable("projectedX", projX) {% endhighlight %}
-A registered `Table` that originates from a Table API operation or SQL query is treated similarly as a view as known from relational DBMS, i.e., it can be inlined when optimizing the query. +**Note:** A registered `Table` is treated similarly to a `VIEW` as known from relational database systems, i.e., the query that defines the `Table` is not optimized but will be inlined when another query references the registered `Table`. If multiple queries reference the same registered `Table`, it will be inlined for each referencing query and executed multiple times, i.e., the result of the registered `Table` will *not* be shared. {% top %} -### Register a DataSet +### Register a TableSource + +A `TableSource` provides access to external data which is stored in a storage systems such as a database (MySQL, HBase, ...), a file with specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...). + +Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for a list of supported TableSources and instructions for how to build a custom `TableSource`. -A `DataSet` is registered as a `Table` in a `BatchTableEnvironment` as follows: +A `TableSource` is registered in a `TableEnvironment` as follows:
{% highlight java %} -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); +// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); -// register the DataSet cust as table "Customers" with fields derived from the dataset -tableEnv.registerDataSet("Customers", cust); +// create a TableSource +TableSource csvSource = new CsvTableSource("/path/to/file", ...); -// register the DataSet ord as table "Orders" with fields user, product, and amount -tableEnv.registerDataSet("Orders", ord, "user, product, amount"); +// register the TableSource as table "CsvTable" +tableEnv.registerTableSource("CsvTable", csvSource); {% endhighlight %}
{% highlight scala %} -val env = ExecutionEnvironment.getExecutionEnvironment +// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) -// register the DataSet cust as table "Customers" with fields derived from the dataset -tableEnv.registerDataSet("Customers", cust) +// create a TableSource +val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) -// register the DataSet ord as table "Orders" with fields user, product, and amount -tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount) +// register the TableSource as table "CsvTable" +tableEnv.registerTableSource("CsvTable", csvSource) {% endhighlight %}
-*Note: The name of a `DataSet` `Table` must not match the `^_DataSetTable_[0-9]+` pattern which is reserved for internal use only.* - {% top %} -### Register a DataStream +Register an External Catalog +---------------------------- + +An external catalog can provide information about external databases and tables such as their name, schema, statistics, and information for how to access data stored in an external database, table, or file. -A `DataStream` is registered as a `Table` in a `StreamTableEnvironment` as follows: +An external catalog can be created by implementing the `ExternalCatalog` interface and is registered in a `TableEnvironment` as follows:
{% highlight java %} -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); -// register the DataStream cust as table "Customers" with fields derived from the datastream -tableEnv.registerDataStream("Customers", cust); +// create an external catalog +ExternalCatalog catalog = new InMemoryExternalCatalog(); -// register the DataStream ord as table "Orders" with fields user, product, and amount -tableEnv.registerDataStream("Orders", ord, "user, product, amount"); +// register the ExternalCatalog catalog +tableEnv.registerExternalCatalog("InMemCatalog", catalog); {% endhighlight %}
{% highlight scala %} -val env = StreamExecutionEnvironment.getExecutionEnvironment +// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) -// register the DataStream cust as table "Customers" with fields derived from the datastream -tableEnv.registerDataStream("Customers", cust) +// create an external catalog +val catalog: ExternalCatalog = new InMemoryExternalCatalog -// register the DataStream ord as table "Orders" with fields user, product, and amount -tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount) +// register the ExternalCatalog catalog +tableEnv.registerExternalCatalog("InMemCatalog", catalog) {% endhighlight %}
-*Note: The name of a `DataStream` `Table` must not match the `^_DataStreamTable_[0-9]+` pattern which is reserved for internal use only.* +Once registered in a `TableEnvironment`, all tables defined in a `ExternalCatalog` can be accessed from Table API or SQL queries by specifying their full path, such as for example `catalog.database.table`. + +Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing purposes. However, the `ExternalCatalog` interface can also be used to connect catalogs like HCatalog or Metastore to the Table API. {% top %} -### Register a TableSource +Query a Table +------------- + +### Table API + +The Table API is a language-integrated query API for Scala and Java. In contrast to SQL, queries are not specified as Strings but are composed step-by-step in the host language. + +The API is based on the `Table` class which represents a table (streaming or batch) and offers methods to apply relational operations. These methods return a new `Table` object, which represents the result of applying the relational operation on the input `Table`. Some relational operations are composed of multiple method calls such as `table.groupBy(...).select()`, where `groupBy(...)` specifies a grouping of `table` and `select(...)` the projection on the grouping of `table`. -TableSources provided access to data stored in various storage systems such as databases (MySQL, HBase, ...), file formats (CSV, Apache Parquet, Avro, ORC, ...), or messaging systems (Apache Kafka, RabbitMQ, ...). Flink provides a TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks page]({{ site.baseurl }}/dev/table/sourceSinks.html) for a list of provided TableSources and documentation for how to built your own. +The [Table API]({{ site.baseurl }}/dev/table/tableapi.html) document describes all Table API operations that are supported on streaming and batch tables. -An external table is registered in a `TableEnvironment` using a `TableSource` as follows: +The following example shows a simple Table API aggregation query:
{% highlight java %} -// works for StreamExecutionEnvironment identically -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); +// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); -TableSource custTS = new CsvTableSource("/path/to/file", ...); +// register Orders table -// register a `TableSource` as external table "Customers" -tableEnv.registerTableSource("Customers", custTS); +// scan registered Orders table +Table orders = tableEnv.scan("Orders"); +// compute revenue for all customers from France +Table revenue = orders + .filter("cCountry === 'FRANCE'") + .groupBy("cID, cName") + .select("cID, cName, revenue.sum AS revSum"); + +// emit or convert Table +// execute query {% endhighlight %}
{% highlight scala %} -// works for StreamExecutionEnvironment identically -val env = ExecutionEnvironment.getExecutionEnvironment +// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) -val custTS: TableSource = new CsvTableSource("/path/to/file", ...) +// register Orders table -// register a `TableSource` as external table "Customers" -tableEnv.registerTableSource("Customers", custTS) +// scan registered Orders table +Table orders = tableEnv.scan("Orders") +// compute revenue for all customers from France +Table revenue = orders + .filter('cCountry === "FRANCE") + .groupBy('cID, 'cName) + .select('cID, 'cName, 'revenue.sum AS 'revSum) +// emit or convert Table +// execute query {% endhighlight %} + +**Note:** The Scala Table API uses Scala Symbols, which start with a single tick (`'`) to reference the attributes of a `Table`.
-A `TableSource` can provide access to data stored in various storage systems such as databases (MySQL, HBase, ...), file formats (CSV, Apache Parquet, Avro, ORC, ...), or messaging systems (Apache Kafka, RabbitMQ, ...). - {% top %} -Register an External Catalog ----------------------------- +### SQL -An external catalog is defined by the `ExternalCatalog` interface and provides information about databases and tables such as their name, schema, statistics, and access information. An `ExternalCatalog` is registered in a `TableEnvironment` as follows: +Flink's SQL integration is based on [Apache Calcite](https://calcite.apache.org) which implements the SQL standard. SQL queries are specified as regular Strings. + +The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes Flink's SQL support for streaming and batch tables. + +The following example shows how to specify a query and return the result as Table. --- End diff -- ... as a Table. > Table API / SQL Docs: Common Page > --------------------------------- > > Key: FLINK-6746 > URL: https://issues.apache.org/jira/browse/FLINK-6746 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL > Affects Versions: 1.3.0 > Reporter: Fabian Hueske > Assignee: Fabian Hueske > > Update and refine ./docs/dev/table/common.md in feature branch https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.3.15#6346)