flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
Date Wed, 10 Oct 2018 10:03:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644771#comment-16644771 ] 

ASF GitHub Bot commented on FLINK-9712:
---------------------------------------

asfgit closed pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 146d1a6c1fd..10070eebf2a 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -753,7 +753,7 @@ val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](ta
 </div>
 </div>
 
-**Note:** A detailed discussion about dynamic tables and their properties is given in the [Streaming Queries]({{ site.baseurl }}/dev/table/streaming.html) document.
+**Note:** A detailed discussion about dynamic tables and their properties is given in the [Dynamic Tables](streaming/dynamic_tables.html) document.
 
 #### Convert a Table into a DataSet
 
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 16649e52ff6..346fd0b6af9 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -82,7 +82,7 @@ The **connector** describes the external system that stores the data of a table.
 
 Some systems support different **data formats**. For example, a table that is stored in Kafka or in files can encode its rows with CSV, JSON, or Avro. A database connector might need the table schema here. Whether or not a storage system requires the definition of a format, is documented for every [connector](connect.html#table-connectors). Different systems also require different [types of formats](connect.html#table-formats) (e.g., column-oriented formats vs. row-oriented formats). The documentation states which format types and connectors are compatible.
 
-The **table schema** defines the schema of a table that is exposed to SQL queries. It describes how a source maps the data format to the table schema and a sink vice versa. The schema has access to fields defined by the connector or format. It can use one or more fields for extracting or inserting [time attributes](streaming.html#time-attributes). If input fields have no determinstic field order, the schema clearly defines column names, their order, and origin.
+The **table schema** defines the schema of a table that is exposed to SQL queries. It describes how a source maps the data format to the table schema and a sink vice versa. The schema has access to fields defined by the connector or format. It can use one or more fields for extracting or inserting [time attributes](streaming/time_attributes.html). If input fields have no determinstic field order, the schema clearly defines column names, their order, and origin.
 
 The subsequent sections will cover each definition part ([connector](connect.html#table-connectors), [format](connect.html#table-formats), and [schema](connect.html#table-schema)) in more detail. The following example shows how to pass them:
 
@@ -293,7 +293,7 @@ schema:
 
 Time attributes are essential when working with unbounded streaming tables. Therefore both processing-time and event-time (also known as "rowtime") attributes can be defined as part of the schema.
 
-For more information about time handling in Flink and especially event-time, we recommend the general [event-time section](streaming.html#time-attributes).
+For more information about time handling in Flink and especially event-time, we recommend the general [event-time section](streaming/time_attributes.html).
 
 ### Rowtime Attributes
 
@@ -436,7 +436,7 @@ ANY<class, serialized>           # used for type information that is not support
 Update Modes
 ------------
 
-For streaming queries, it is required to declare how to perform the [conversion between a dynamic table and an external connector](streaming.html#dynamic-tables--continuous-queries). The *update mode* specifies which kind of messages should be exchanged with the external system:
+For streaming queries, it is required to declare how to perform the [conversion between a dynamic table and an external connector](streaming/dynamic_tables.html#continuous-queries). The *update mode* specifies which kind of messages should be exchanged with the external system:
 
 **Append Mode:** In append mode, a dynamic table and an external connector only exchange INSERT messages.
 
@@ -463,7 +463,7 @@ tables:
 </div>
 </div>
 
-See also the [general streaming concepts documentation](streaming.html#dynamic-tables--continuous-queries) for more information.
+See also the [general streaming concepts documentation](streaming/dynamic_tables.html#continuous-queries) for more information.
 
 {% top %}
 
@@ -935,7 +935,7 @@ val orcTableSource = OrcTableSource.builder()
 
 The `CsvTableSink` emits a `Table` to one or more CSV files. 
 
-The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming.html#table-to-stream-conversion) for details. When emitting a streaming table, rows are written at least once (if checkpointing is enabled) and the `CsvTableSink` does not split output files into bucket files but continuously writes to the same files. 
+The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details. When emitting a streaming table, rows are written at least once (if checkpointing is enabled) and the `CsvTableSink` does not split output files into bucket files but continuously writes to the same files.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -971,7 +971,7 @@ table.writeToSink(
 
 ### JDBCAppendTableSink
 
-The `JDBCAppendTableSink` emits a `Table` to a JDBC connection. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming.html#table-to-stream-conversion) for details. 
+The `JDBCAppendTableSink` emits a `Table` to a JDBC connection. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details.
 
 The `JDBCAppendTableSink` inserts each `Table` row at least once into the database table (if checkpointing is enabled). However, you can specify the insertion query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to perform upsert writes to the database.
 
@@ -1014,7 +1014,7 @@ Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify t
 
 ### CassandraAppendTableSink
 
-The `CassandraAppendTableSink` emits a `Table` to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming.html#table-to-stream-conversion) for details. 
+The `CassandraAppendTableSink` emits a `Table` to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details.
 
 The `CassandraAppendTableSink` inserts all rows at least once into the Cassandra table if checkpointing is enabled. However, you can specify the query as upsert query.
 
diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 85768ab31c1..1c81bfbf712 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -285,7 +285,7 @@ EXISTS (sub-query)
       </td>
       <td>
         <p>Returns TRUE if <i>sub-query</i> returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
 
@@ -297,7 +297,7 @@ value IN (sub-query)
       </td>
       <td>
         <p>Returns TRUE if <i>value</i> is equal to a row returned by sub-query.</p>
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
 
@@ -309,7 +309,7 @@ value NOT IN (sub-query)
       </td>
       <td>
         <p>Returns TRUE if <i>value</i> is not equal to every row returned by <i>sub-query</i>.</p>
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     </tbody>
@@ -459,7 +459,7 @@ ANY.in(TABLE)
       </td>
       <td>
         <p>Returns TRUE if <i>ANY</i> is equal to a row returned by sub-query <i>TABLE</i>.</p>
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
 
@@ -639,7 +639,7 @@ ANY.in(TABLE)
       </td>
       <td>
         <p>Returns TRUE if <i>ANY</i> is equal to a row returned by sub-query <i>TABLE</i>.</p>
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
 
diff --git a/docs/dev/table/index.md b/docs/dev/table/index.md
index 34e94684ecf..3ee9173c467 100644
--- a/docs/dev/table/index.md
+++ b/docs/dev/table/index.md
@@ -73,10 +73,10 @@ Where to go next?
 -----------------
 
 * [Concepts & Common API]({{ site.baseurl }}/dev/table/common.html): Shared concepts and APIs of the Table API and SQL.
-* [Streaming Table API & SQL]({{ site.baseurl }}/dev/table/streaming.html): Streaming-specific documentation for the Table API or SQL such as configuration of time attributes and handling of updating results.
+* [Streaming Concepts]({{ site.baseurl }}/dev/table/streaming): Streaming-specific documentation for the Table API or SQL such as configuration of time attributes and handling of updating results.
 * [Table API]({{ site.baseurl }}/dev/table/tableApi.html): Supported operations and API for the Table API.
 * [SQL]({{ site.baseurl }}/dev/table/sql.html): Supported operations and syntax for SQL
 * [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html): Reading tables from and emitting tables to external storage systems.
 * [User-Defined Functions]({{ site.baseurl }}/dev/table/udfs.html): Definition and usage of user-defined functions.
 
-{% top %}
\ No newline at end of file
+{% top %}
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7b831b71a9c..0ff71418940 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -139,13 +139,13 @@ StreamTableSource[T] extends TableSource[T] {
 
 ### Defining a TableSource with Time Attributes
 
-Time-based operations of streaming [Table API](tableApi.html#group-windows) and [SQL](sql.html#group-windows) queries, such as windowed aggregations or joins, require explicitly specified [time attributes]({{ site.baseurl }}/dev/table/streaming.html#time-attributes). 
+Time-based operations of streaming [Table API](tableApi.html#group-windows) and [SQL](sql.html#group-windows) queries, such as windowed aggregations or joins, require explicitly specified [time attributes](streaming/time_attributes.html).
 
 A `TableSource` defines a time attribute as a field of type `Types.SQL_TIMESTAMP` in its table schema. In contrast to all regular fields in the schema, a time attribute must not be matched to a physical field in the return type of the table source. Instead, a `TableSource` defines a time attribute by implementing a certain interface.
 
 #### Defining a Processing Time Attribute
 
-[Processing time attributes](streaming.html#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. A `TableSource` defines a processing time attribute by implementing the `DefinedProctimeAttribute` interface. The interface looks as follows:
+[Processing time attributes](streaming/time_attributes.html#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. A `TableSource` defines a processing time attribute by implementing the `DefinedProctimeAttribute` interface. The interface looks as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -173,7 +173,7 @@ DefinedProctimeAttribute {
 
 #### Defining a Rowtime Attribute
 
-[Rowtime attributes](streaming.html#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
+[Rowtime attributes](streaming/time_attributes.html#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
 
 A table schema field of type `SQL_TIMESTAMP` can be declared as rowtime attribute by specifying 
 
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index a9fd94f10d5..2a40c58f7b5 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -287,7 +287,7 @@ SELECT PRETTY_PRINT(user) FROM Orders
         <span class="label label-info">Result Updating</span>
       </td>
       <td>
-        <p><b>Note:</b> GroupBy on a streaming table produces an updating result. See the <a href="streaming.html">Streaming Concepts</a> page for details.
+        <p><b>Note:</b> GroupBy on a streaming table produces an updating result. See the <a href="streaming/dynamic_tables.html">Dynamic Tables Streaming Concepts</a> page for details.
         </p>
 {% highlight sql %}
 SELECT a, SUM(b) as d
@@ -316,7 +316,7 @@ GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
         <span class="label label-primary">Streaming</span>
       </td>
     	<td>
-        <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a></p>
+        <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming/time_attributes.html">time attribute</a></p>
 {% highlight sql %}
 SELECT COUNT(amount) OVER (
   PARTITION BY user
@@ -343,7 +343,7 @@ WINDOW w AS (
 {% highlight sql %}
 SELECT DISTINCT users FROM Orders
 {% endhighlight %}
-       <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+       <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -416,7 +416,7 @@ GROUP BY users
 SELECT *
 FROM Orders INNER JOIN Product ON Orders.productId = Product.id
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -438,7 +438,7 @@ FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
 SELECT *
 FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -449,9 +449,9 @@ FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming/time_attributes.html">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
         <p>For example, the following predicates are valid window join conditions:</p>
-          
+
         <ul>
           <li><code>ltime = rtime</code></li>
           <li><code>ltime &gt;= rtime AND ltime &lt; rtime + INTERVAL '10' MINUTE</code></li>
@@ -502,6 +502,31 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
         <p><b>Note:</b> Currently, only literal <code>TRUE</code> is supported as predicate for a left outer join against a lateral table.</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><a href="streaming/temporal_tables.html">Temporal Tables</a> are tables that track changes over time.
+        A <a href="streaming/temporal_tables.html#temporal-table-functions">Temporal Table Function</a> provides access to the state of a temporal table at a specific point in time.
+        The syntax to join a table with a temporal table function is the same as in Join with Table Functions.</p>
+
+        <p>Currently only inner joins with temporal tables are supported.</p>
+        Assuming <strong>Rates</strong> is a <a href="streaming/temporal_tables.html#temporal-table-functions">Temporal Table Function</a></p>
+{% highlight sql %}
+SELECT
+  o_amount, r_rate
+FROM
+  Orders,
+  LATERAL TABLE (Rates(o_proctime))
+WHERE
+  r_currency = o_currency
+{% endhighlight %}
+        <p>For more information please check the more detailed <a href="streaming/temporal_tables.html">Temporal Tables concept description.</a></p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -591,7 +616,7 @@ WHERE product IN (
     SELECT product FROM NewProducts
 )
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
 
@@ -609,7 +634,7 @@ WHERE product EXISTS (
     SELECT product FROM NewProducts
 )
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
   </tbody>
@@ -635,7 +660,7 @@ WHERE product EXISTS (
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-<b>Note:</b> The result of streaming queries must be primarily sorted on an ascending <a href="streaming.html#time-attributes">time attribute</a>. Additional sorting attributes are supported.
+<b>Note:</b> The result of streaming queries must be primarily sorted on an ascending <a href="streaming/time_attributes.html">time attribute</a>. Additional sorting attributes are supported.
 
 {% highlight sql %}
 SELECT *
@@ -728,7 +753,7 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
 
 #### Time Attributes
 
-For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming.html#time-attributes) to learn how to define time attributes.
+For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming/time_attributes.html) to learn how to define time attributes.
 
 For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`.
 
@@ -760,7 +785,7 @@ The start and end timestamps of group windows as well as time attributes can be
         <code>SESSION_END(time_attr, interval)</code><br/>
       </td>
       <td><p>Returns the timestamp of the <i>exclusive</i> upper bound of the corresponding tumbling, hopping, or session window.</p>
-        <p><b>Note:</b> The exclusive upper bound timestamp <i>cannot</i> be used as a <a href="streaming.html#time-attributes">rowtime attribute</a> in subsequent time-based operations, such as <a href="#joins">time-windowed joins</a> and <a href="#aggregations">group window or over window aggregations</a>.</p></td>
+        <p><b>Note:</b> The exclusive upper bound timestamp <i>cannot</i> be used as a <a href="streaming/time_attributes.html">rowtime attribute</a> in subsequent time-based operations, such as <a href="#joins">time-windowed joins</a> and <a href="#aggregations">group window or over window aggregations</a>.</p></td>
     </tr>
     <tr>
       <td>
@@ -769,7 +794,7 @@ The start and end timestamps of group windows as well as time attributes can be
         <code>SESSION_ROWTIME(time_attr, interval)</code><br/>
       </td>
       <td><p>Returns the timestamp of the <i>inclusive</i> upper bound of the corresponding tumbling, hopping, or session window.</p>
-      <p>The resulting attribute is a <a href="streaming.html#time-attributes">rowtime attribute</a> that can be used in subsequent time-based operations such as <a href="#joins">time-windowed joins</a> and <a href="#aggregations">group window or over window aggregations</a>.</p></td>
+      <p>The resulting attribute is a <a href="streaming/time_attributes.html">rowtime attribute</a> that can be used in subsequent time-based operations such as <a href="#joins">time-windowed joins</a> and <a href="#aggregations">group window or over window aggregations</a>.</p></td>
     </tr>
     <tr>
       <td>
@@ -777,7 +802,7 @@ The start and end timestamps of group windows as well as time attributes can be
         <code>HOP_PROCTIME(time_attr, interval, interval)</code><br/>
         <code>SESSION_PROCTIME(time_attr, interval)</code><br/>
       </td>
-      <td><p>Returns a <a href="streaming.html#time-attributes">proctime attribute</a> that can be used in subsequent time-based operations such as <a href="#joins">time-windowed joins</a> and <a href="#aggregations">group window or over window aggregations</a>.</p></td>
+      <td><p>Returns a <a href="streaming/time_attributes.html#processing-time">proctime attribute</a> that can be used in subsequent time-based operations such as <a href="#joins">time-windowed joins</a> and <a href="#aggregations">group window or over window aggregations</a>.</p></td>
     </tr>
   </tbody>
 </table>
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 5224842457f..f4d71f6b991 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -74,7 +74,7 @@ The **table mode** materializes results in memory and visualizes them in a regul
 SET execution.result-mode=table;
 {% endhighlight %}
 
-The **changelog mode** does not materialize results and visualizes the result stream that is produced by a [continuous query](streaming.html#dynamic-tables--continuous-queries) consisting of insertions (`+`) and retractions (`-`).
+The **changelog mode** does not materialize results and visualizes the result stream that is produced by a [continuous query](streaming/dynamic_tables.html#continuous-queries) consisting of insertions (`+`) and retractions (`-`).
 
 {% highlight text %}
 SET execution.result-mode=changelog;
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
deleted file mode 100644
index d0fea47cbfd..00000000000
--- a/docs/dev/table/streaming.md
+++ /dev/null
@@ -1,603 +0,0 @@
----
-title: "Streaming Concepts"
-nav-parent_id: tableapi
-nav-pos: 10
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink's [Table API](tableApi.html) and [SQL support](sql.html) are unified APIs for batch and stream processing. This means that Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. Because the relational algebra and SQL were originally designed for batch processing, relational queries on unbounded streaming input are not as well understood as relational queries on bounded batch input. 
-
-On this page, we explain concepts, practical limitations, and stream-specific configuration parameters of Flink's relational APIs on streaming data. 
-
-* This will be replaced by the TOC
-{:toc}
-
-Relational Queries on Data Streams
-----------------------------------
-
-SQL and the relational algebra have not been designed with streaming data in mind. As a consequence, there are few conceptual gaps between relational algebra (and SQL) and stream processing.
-
-<table class="table table-bordered">
-	<tr>
-		<th>Relational Algebra / SQL</th>
-		<th>Stream Processing</th>
-	</tr>
-	<tr>
-		<td>Relations (or tables) are bounded (multi-)sets of tuples.</td>
-		<td>A stream is an infinite sequences of tuples.</td>
-	</tr>
-	<tr>
-		<td>A query that is executed on batch data (e.g., a table in a relational database) has access to the complete input data.</td>
-		<td>A streaming query cannot access all data when is started and has to "wait" for data to be streamed in.</td>
-	</tr>
-	<tr>
-		<td>A batch query terminates after it produced a fixed sized result.</td>
-		<td>A streaming query continuously updates its result based on the received records and never completes.</td>
-	</tr>
-</table>
-
-Despite these differences, processing streams with relational queries and SQL is not impossible. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes outdated when the base tables of its definition query are modified. *Eager View Maintenance* is a technique to update materialized views and updates a materialized view as soon as its base tables are updated. 
-
-The connection between eager view maintenance and SQL queries on streams becomes obvious if we consider the following:
-
-- A database table is the result of a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*.
-- A materialized view is defined as a SQL query. In order to update the view, the query is continuously processes the changelog streams of the view's base relations.
-- The materialized view is the result of the streaming SQL query.
-
-With these points in mind, we introduce Flink's concept of *Dynamic Tables* in the next section.
-
-Dynamic Tables &amp; Continuous Queries
----------------------------------------
-
-*Dynamic tables* are the core concept of Flink's Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic table are changing over time. They can be queried like static batch tables. Querying a dynamic table yields a *Continuous Query*. A continuous query never terminates and produces a dynamic table as result. The query continuously updates its (dynamic) result table to reflect the changes on its input (dynamic) table. Essentially, a continuous query on a dynamic table is very similar to the definition query of a materialized view. 
-
-It is important to note that the result of a continuous query is always semantically equivalent to the result of the same query being executed in batch mode on a snapshot of the input tables.
-
-The following figure visualizes the relationship of streams, dynamic tables, and  continuous queries: 
-
-<center>
-<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/stream-query-stream.png" width="80%">
-</center>
-
-1. A stream is converted into a dynamic table.
-1. A continuous query is evaluated on the dynamic table yielding a new dynamic table.
-1. The resulting dynamic table is converted back into a stream.
-
-**Note:** Dynamic tables are foremost a logical concept. Dynamic tables are not necessarily (fully) materialized during query execution.
-
-In the following, we will explain the concepts of dynamic tables and continuous queries with a stream of click events that have the following schema:
-
-{% highlight plain %}
-[ 
-  user:  VARCHAR,   // the name of the user
-  cTime: TIMESTAMP, // the time when the URL was accessed
-  url:   VARCHAR    // the URL that was accessed by the user
-]
-{% endhighlight %}
-
-### Defining a Table on a Stream
-
-In order to process a stream with a relational query, it has to be converted into a `Table`. Conceptually, each record of the stream is interpreted as an `INSERT` modification on the resulting table. Essentially, we are building a table from an `INSERT`-only changelog stream.
-
-The following figure visualizes how the stream of click event (left-hand side) is converted into a table (right-hand side). The resulting table is continuously growing as more records of the click stream are inserted.
-
-<center>
-<img alt="Append mode" src="{{ site.baseurl }}/fig/table-streaming/append-mode.png" width="60%">
-</center>
-
-**Note:** A table which is defined on a stream is internally not materialized. 
-
-### Continuous Queries
-
-A continuous query is evaluated on a dynamic table and produces a new dynamic table as result. In contrast to a batch query, a continuous query never terminates and updates its result table according to the updates on its input tables. At any point in time, the result of a continuous query is semantically equivalent to the result of the same query being executed in batch mode on a snapshot of the input tables. 
-
-In the following we show two example queries on a `clicks` table that is defined on the stream of click events.
-
-The first query is a simple `GROUP-BY COUNT` aggregation query. It groups the `clicks` table on the `user` field and counts the number of visited URLs. The following figure shows how the query is evaluated over time as the `clicks` table is updated with additional rows.
-
-<center>
-<img alt="Continuous Non-Windowed Query" src="{{ site.baseurl }}/fig/table-streaming/query-groupBy-cnt.png" width="90%">
-</center>
-
-When the query is started, the `clicks` table (left-hand side) is empty. The query starts to compute the result table, when the first row is inserted into the `clicks` table. After the first row `[Mary, ./home]` was inserted, the result table (right-hand side, top) consists of a single row `[Mary, 1]`. When the second row `[Bob, ./cart]` is inserted into the `clicks` table, the query updates the result table and inserts a new row `[Bob, 1]`. The third row `[Mary, ./prod?id=1]` yields an update of an already computed result row such that `[Mary, 1]` is updated to `[Mary, 2]`. Finally, the query inserts a third row `[Liz, 1]` into the result table, when the fourth row is appended to the `clicks` table.
-
-The second query is similar to the first one but groups the `clicks` table in addition to the `user` attribute also on an [hourly tumbling window](./sql.html#group-windows) before it counts the number of URLs (time-based computations such as windows are based on special [time attributes](#time-attributes), which are discussed below.). Again, the figure shows the input and output at different points in time to visualize the changing nature of dynamic tables.
-
-<center>
-<img alt="Continuous Group-Window Query" src="{{ site.baseurl }}/fig/table-streaming/query-groupBy-window-cnt.png" width="100%">
-</center>
-
-As before, the input table `clicks` is shown on the left. The query continuously computes results every hour and updates the result table. The clicks table contains four rows with timestamps (`cTime`) between `12:00:00` and `12:59:59`. The query computes two results rows from this input (one for each `user`) and appends them to the result table. For the next window between `13:00:00` and `13:59:59`, the `clicks` table contains three rows, which results in another two rows being appended to the result table. The result table is updated, as more rows are appended to `clicks` over time.
-
-#### Update and Append Queries
-
-Although the two example queries appear to be quite similar (both compute a grouped count aggregate), they differ in one important aspect: 
-- The first query updates previously emitted results, i.e., the changelog stream that defines the result table contains `INSERT` and `UPDATE` changes. 
-- The second query only appends to the result table, i.e., the changelog stream of the result table only consists of `INSERT` changes.
-
-Whether a query produces an append-only table or an updated table has some implications:
-- Queries that produce update changes usually have to maintain more state (see the following section).
-- The conversion of an append-only table into a stream is different from the conversion of an updated table (see the [Table to Stream Conversion](#table-to-stream-conversion) section). 
-
-#### Query Restrictions
-
-Many, but not all, semantically valid queries can be evaluated as continuous queries on streams. Some queries are too expensive to compute, either due to the size of state that they need to maintain or because computing updates is too expensive.
-
-- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows in order to be able to update them. For instance, the first example query needs to store the URL count for each user to be able to increase the count and sent out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and might eventually cause the query to fail.
-
-{% highlight sql %}
-SELECT user, COUNT(url)
-FROM clicks
-GROUP BY user;
-{% endhighlight %}
-
-- **Computing Updates:** Some queries require to recompute and update a large fraction of the emitted result rows even if only a single input record is added or updated. Clearly, such queries are not well suited to be executed as continuous queries. An example is the following query which computes for each user a `RANK` based on the time of the last click. As soon as the `clicks` table receives a new row, the `lastAction` of the user is updated and a new rank must be computed. However since two rows cannot have the same rank, all lower ranked rows need to be updated as well.
-
-{% highlight sql %}
-SELECT user, RANK() OVER (ORDER BY lastLogin) 
-FROM (
-  SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
-);
-{% endhighlight %}
-
-The [QueryConfig](#query-configuration) section discusses parameters to control the execution of continuous queries. Some parameters can be used to trade the size of maintained state for result accuracy.
-
-### Table to Stream Conversion
-
-A dynamic table can be continuously modified by `INSERT`, `UPDATE`, and `DELETE` changes just like a regular database table. It might be a table with a single row, which is constantly updated, an insert-only table without `UPDATE` and `DELETE` modifications, or anything in between.
-
-When converting a dynamic table into a stream or writing it to an external system, these changes need to be encoded. Flink's Table API and SQL support three ways to encode the changes of a dynamic table:
-
-* **Append-only stream:** A dynamic table that is only modified by `INSERT` changes can be  converted into a stream by emitting the inserted rows. 
-
-* **Retract stream:** A retract stream is a stream with two types of messages, *add messages* and *retract messages*. A dynamic table is converted into an retract stream by encoding an `INSERT` change as add message, a `DELETE` change as retract message, and an `UPDATE` change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.
-
-<center>
-<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/undo-redo-mode.png" width="85%">
-</center>
-<br><br>
-
-* **Upsert stream:** An upsert stream is a stream with two types of messages, *upsert messages* and *delete message*. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a dynamic table by encoding `INSERT` and `UPDATE` changes as upsert message and `DELETE` changes as delete message. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that `UPDATE` changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.
-
-<center>
-<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/redo-mode.png" width="85%">
-</center>
-<br><br>
-
-The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts](./common.html#convert-a-table-into-a-datastream) page. Please note that only append and retract streams are supported when converting a dynamic table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external system are discussed on the [TableSources and TableSinks](./sourceSinks.html#define-a-tablesink) page.
-
-{% top %}
-
-Time Attributes
----------------
-
-Flink is able to process streaming data based on different notions of *time*.
-
-- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps which are attached to each row. The timestamps can encode when an event happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is treated similarly to event time.
-
-For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
-
-Table programs require that the corresponding time characteristic has been specified for the streaming environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-Time-based operations such as windows in both the [Table API]({{ site.baseurl }}/dev/table/tableApi.html#group-windows) and [SQL]({{ site.baseurl }}/dev/table/sql.html#group-windows) require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
-
-Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or are pre-defined when using a `TableSource`. Once a time attribute has been defined at the beginning, it can be referenced as a field and can used in time-based operations.
-
-As long as a time attribute is not modified and is simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and thus can not be used for time-based operations anymore.
-
-### Processing time
-
-Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It neither requires timestamp extraction nor watermark generation.
-
-There are two ways to define a processing time attribute.
-
-#### During DataStream-to-Table Conversion
-
-The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<String, String>> stream = ...;
-
-// declare an additional logical field as a processing time attribute
-Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
-
-WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[(String, String)] = ...
-
-// declare an additional logical field as a processing time attribute
-val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
-
-val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
-{% endhighlight %}
-</div>
-</div>
-
-#### Using a TableSource
-
-The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// define a table source with a processing attribute
-public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
-
-	@Override
-	public TypeInformation<Row> getReturnType() {
-		String[] names = new String[] {"Username" , "Data"};
-		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
-		return Types.ROW(names, types);
-	}
-
-	@Override
-	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
-		// create stream 
-		DataStream<Row> stream = ...;
-		return stream;
-	}
-
-	@Override
-	public String getProctimeAttribute() {
-		// field with this name will be appended as a third field 
-		return "UserActionTime";
-	}
-}
-
-// register table source
-tEnv.registerTableSource("UserActions", new UserActionSource());
-
-WindowedTable windowedTable = tEnv
-	.scan("UserActions")
-	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// define a table source with a processing attribute
-class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
-
-	override def getReturnType = {
-		val names = Array[String]("Username" , "Data")
-		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
-		Types.ROW(names, types)
-	}
-
-	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
-		// create stream
-		val stream = ...
-		stream
-	}
-
-	override def getProctimeAttribute = {
-		// field with this name will be appended as a third field 
-		"UserActionTime"
-	}
-}
-
-// register table source
-tEnv.registerTableSource("UserActions", new UserActionSource)
-
-val windowedTable = tEnv
-	.scan("UserActions")
-	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
-{% endhighlight %}
-</div>
-</div>
-
-### Event time
-
-Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage.
-
-Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular field of a record in a batch environment.
-
-In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
-
-An event time attribute can be defined either during DataStream-to-Table conversion or by using a TableSource. 
-
-#### During DataStream-to-Table Conversion
-
-The event time attribute is defined with the `.rowtime` property during schema definition. [Timestamps and watermarks]({{ site.baseurl }}/dev/event_time.html) must have been assigned in the `DataStream` that is converted.
-
-There are two ways of defining the time attribute when converting a `DataStream` into a `Table`. Depending on whether the specified `.rowtime` field name exists in the schema of the `DataStream` or not, the timestamp field is either 
-
-- appended as a new field to the schema or
-- replaces an existing field.
-
-In either case the event time timestamp field will hold the value of the `DataStream` event time timestamp.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// Option 1:
-
-// extract timestamp and assign watermarks based on knowledge of the stream
-DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
-
-// declare an additional logical field as an event time attribute
-Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
-
-
-// Option 2:
-
-// extract timestamp from first field, and assign watermarks based on knowledge of the stream
-DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
-
-// the first field has been used for timestamp extraction, and is no longer necessary
-// replace first field with a logical event time attribute
-Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
-
-// Usage:
-
-WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// Option 1:
-
-// extract timestamp and assign watermarks based on knowledge of the stream
-val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
-
-// declare an additional logical field as an event time attribute
-val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
-
-
-// Option 2:
-
-// extract timestamp from first field, and assign watermarks based on knowledge of the stream
-val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
-
-// the first field has been used for timestamp extraction, and is no longer necessary
-// replace first field with a logical event time attribute
-val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
-
-// Usage:
-
-val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
-{% endhighlight %}
-</div>
-</div>
-
-#### Using a TableSource
-
-The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The `getRowtimeAttribute()` method returns the name of an existing field that carries the event time attribute of the table and is of type `LONG` or `TIMESTAMP`.
-
-Moreover, the `DataStream` returned by the `getDataStream()` method must have watermarks assigned that are aligned with the defined time attribute. Please note that the timestamps of the `DataStream` (the ones which are assigned by a `TimestampAssigner`) are ignored. Only the values of the `TableSource`'s rowtime attribute are relevant.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// define a table source with a rowtime attribute
-public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
-
-	@Override
-	public TypeInformation<Row> getReturnType() {
-		String[] names = new String[] {"Username", "Data", "UserActionTime"};
-		TypeInformation[] types = 
-		    new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
-		return Types.ROW(names, types);
-	}
-
-	@Override
-	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
-		// create stream 
-		// ...
-		// assign watermarks based on the "UserActionTime" attribute
-		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
-		return stream;
-	}
-
-	@Override
-	public String getRowtimeAttribute() {
-		// Mark the "UserActionTime" attribute as event-time attribute.
-		return "UserActionTime";
-	}
-}
-
-// register the table source
-tEnv.registerTableSource("UserActions", new UserActionSource());
-
-WindowedTable windowedTable = tEnv
-	.scan("UserActions")
-	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// define a table source with a rowtime attribute
-class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
-
-	override def getReturnType = {
-		val names = Array[String]("Username" , "Data", "UserActionTime")
-		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
-		Types.ROW(names, types)
-	}
-
-	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
-		// create stream 
-		// ...
-		// assign watermarks based on the "UserActionTime" attribute
-		val stream = inputStream.assignTimestampsAndWatermarks(...)
-		stream
-	}
-
-	override def getRowtimeAttribute = {
-		// Mark the "UserActionTime" attribute as event-time attribute.
-		"UserActionTime"
-	}
-}
-
-// register the table source
-tEnv.registerTableSource("UserActions", new UserActionSource)
-
-val windowedTable = tEnv
-	.scan("UserActions")
-	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-Query Configuration
--------------------
-
-Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. In many cases, continuous queries on streaming input are capable of computing accurate results that are identical to offline computed results. However, this is not possible in general case because continuous queries have to restrict the size of the state they are maintaining in order to avoid to run out of storage and to be able to process unbounded streaming data over a long period of time. As a result, a continuous query might only be able to provide approximated results depending on the characteristics of the input data and the query itself.
-
-Flink's Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a `QueryConfig` object. The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table` is translated, i.e., when it is [transformed into a DataStream](common.html#convert-a-table-into-a-datastream-or-dataset) or [emitted via a TableSink](common.html#emit-a-table).
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-// obtain query configuration from TableEnvironment
-StreamQueryConfig qConfig = tableEnv.queryConfig();
-// set query parameters
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
-
-// define query
-Table result = ...
-
-// create TableSink
-TableSink<Row> sink = ...
-
-// emit result Table via a TableSink
-result.writeToSink(sink, qConfig);
-
-// convert result Table into a DataStream<Row>
-DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
-
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-val tableEnv = TableEnvironment.getTableEnvironment(env)
-
-// obtain query configuration from TableEnvironment
-val qConfig: StreamQueryConfig = tableEnv.queryConfig
-// set query parameters
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
-
-// define query
-val result: Table = ???
-
-// create TableSink
-val sink: TableSink[Row] = ???
-
-// emit result Table via a TableSink
-result.writeToSink(sink, qConfig)
-
-// convert result Table into a DataStream[Row]
-val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
-
-{% endhighlight %}
-</div>
-</div>
-
-In the following we describe the parameters of the `QueryConfig` and how they affect the accuracy and resource consumption of a query.
-
-### Idle State Retention Time
-
-Many queries aggregate or join records on one or more key attributes. When such a query is executed on a stream, the continuous query needs to collect records or maintain partial results per key. If the key domain of the input stream is evolving, i.e., the active key values are changing over time, the continuous query accumulates more and more state as more and more distinct keys are observed. However, often keys become inactive after some time and their corresponding state becomes stale and useless.
-
-For example the following query computes the number of clicks per session.
-
-{% highlight sql %}
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
-{% endhighlight %}
-
-The `sessionId` attribute is used as a grouping key and the continuous query maintains a count for each `sessionId` it observes. The `sessionId` attribute is evolving over time and `sessionId` values are only active until the session ends, i.e., for a limited period of time. However, the continuous query cannot know about this property of `sessionId` and expects that every `sessionId` value can occur at any point of time. It maintains a count for each observed `sessionId` value. Consequently, the total state size of the query is continuously growing as more and more `sessionId` values are observed. 
-
-The *Idle State Retention Time* parameters define for how long the state of a key is retained without being updated before it is removed. For the previous example query, the count of a `sessionId` would be removed as soon as it has not been updated for the configured period of time.
-
-By removing the state of a key, the continuous query completely forgets that it has seen this key before. If a record with a key, whose state has been removed before, is processed, the record will be treated as if it was the first record with the respective key. For the example above this means that the count of a `sessionId` would start again at `0`.
-
-There are two parameters to configure the idle state retention time:
-- The *minimum idle state retention time* defines how long the state of an inactive key is at least kept before it is removed.
-- The *maximum idle state retention time* defines how long the state of an inactive key is at most kept before it is removed.
-
-The parameters are specified as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-StreamQueryConfig qConfig = ...
-
-// set idle state retention time: min = 12 hours, max = 24 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
-
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-val qConfig: StreamQueryConfig = ???
-
-// set idle state retention time: min = 12 hours, max = 24 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
-
-{% endhighlight %}
-</div>
-</div>
-
-Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
-
-{% top %}
-
-
diff --git a/docs/dev/table/streaming/dynamic_tables.md b/docs/dev/table/streaming/dynamic_tables.md
new file mode 100644
index 00000000000..43a9127a400
--- /dev/null
+++ b/docs/dev/table/streaming/dynamic_tables.md
@@ -0,0 +1,182 @@
+---
+title: "Dynamic Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Relational Queries on Data Streams
+----------------------------------
+
+SQL and the relational algebra have not been designed with streaming data in mind. As a consequence, there are few conceptual gaps between relational algebra (and SQL) and stream processing.
+
+<table class="table table-bordered">
+	<tr>
+		<th>Relational Algebra / SQL</th>
+		<th>Stream Processing</th>
+	</tr>
+	<tr>
+		<td>Relations (or tables) are bounded (multi-)sets of tuples.</td>
+		<td>A stream is an infinite sequences of tuples.</td>
+	</tr>
+	<tr>
+		<td>A query that is executed on batch data (e.g., a table in a relational database) has access to the complete input data.</td>
+		<td>A streaming query cannot access all data when is started and has to "wait" for data to be streamed in.</td>
+	</tr>
+	<tr>
+		<td>A batch query terminates after it produced a fixed sized result.</td>
+		<td>A streaming query continuously updates its result based on the received records and never completes.</td>
+	</tr>
+</table>
+
+Despite these differences, processing streams with relational queries and SQL is not impossible. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes outdated when the base tables of its definition query are modified. *Eager View Maintenance* is a technique to update materialized views and updates a materialized view as soon as its base tables are updated.
+
+The connection between eager view maintenance and SQL queries on streams becomes obvious if we consider the following:
+
+- A database table is the result of a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*.
+- A materialized view is defined as a SQL query. In order to update the view, the query is continuously processes the changelog streams of the view's base relations.
+- The materialized view is the result of the streaming SQL query.
+
+With these points in mind, we introduce following concept of *Dynamic tables* in the next section.
+
+Dynamic Tables &amp; Continuous Queries
+---------------------------------------
+
+*Dynamic tables* are the core concept of Flink's Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic table are changing over time. They can be queried like static batch tables. Querying a dynamic table yields a *Continuous Query*. A continuous query never terminates and produces a dynamic table as result. The query continuously updates its (dynamic) result table to reflect the changes on its input (dynamic) table. Essentially, a continuous query on a dynamic table is very similar to the definition query of a materialized view.
+
+It is important to note that the result of a continuous query is always semantically equivalent to the result of the same query being executed in batch mode on a snapshot of the input tables.
+
+The following figure visualizes the relationship of streams, dynamic tables, and  continuous queries:
+
+<center>
+<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/stream-query-stream.png" width="80%">
+</center>
+
+1. A stream is converted into a dynamic table.
+1. A continuous query is evaluated on the dynamic table yielding a new dynamic table.
+1. The resulting dynamic table is converted back into a stream.
+
+**Note:** Dynamic tables are foremost a logical concept. Dynamic tables are not necessarily (fully) materialized during query execution.
+
+In the following, we will explain the concepts of dynamic tables and continuous queries with a stream of click events that have the following schema:
+
+{% highlight plain %}
+[
+  user:  VARCHAR,   // the name of the user
+  cTime: TIMESTAMP, // the time when the URL was accessed
+  url:   VARCHAR    // the URL that was accessed by the user
+]
+{% endhighlight %}
+
+Defining a Table on a Stream
+----------------------------
+
+In order to process a stream with a relational query, it has to be converted into a `Table`. Conceptually, each record of the stream is interpreted as an `INSERT` modification on the resulting table. Essentially, we are building a table from an `INSERT`-only changelog stream.
+
+The following figure visualizes how the stream of click event (left-hand side) is converted into a table (right-hand side). The resulting table is continuously growing as more records of the click stream are inserted.
+
+<center>
+<img alt="Append mode" src="{{ site.baseurl }}/fig/table-streaming/append-mode.png" width="60%">
+</center>
+
+**Note:** A table which is defined on a stream is internally not materialized.
+
+### Continuous Queries
+----------------------
+
+A continuous query is evaluated on a dynamic table and produces a new dynamic table as result. In contrast to a batch query, a continuous query never terminates and updates its result table according to the updates on its input tables. At any point in time, the result of a continuous query is semantically equivalent to the result of the same query being executed in batch mode on a snapshot of the input tables.
+
+In the following we show two example queries on a `clicks` table that is defined on the stream of click events.
+
+The first query is a simple `GROUP-BY COUNT` aggregation query. It groups the `clicks` table on the `user` field and counts the number of visited URLs. The following figure shows how the query is evaluated over time as the `clicks` table is updated with additional rows.
+
+<center>
+<img alt="Continuous Non-Windowed Query" src="{{ site.baseurl }}/fig/table-streaming/query-groupBy-cnt.png" width="90%">
+</center>
+
+When the query is started, the `clicks` table (left-hand side) is empty. The query starts to compute the result table, when the first row is inserted into the `clicks` table. After the first row `[Mary, ./home]` was inserted, the result table (right-hand side, top) consists of a single row `[Mary, 1]`. When the second row `[Bob, ./cart]` is inserted into the `clicks` table, the query updates the result table and inserts a new row `[Bob, 1]`. The third row `[Mary, ./prod?id=1]` yields an update of an already computed result row such that `[Mary, 1]` is updated to `[Mary, 2]`. Finally, the query inserts a third row `[Liz, 1]` into the result table, when the fourth row is appended to the `clicks` table.
+
+The second query is similar to the first one but groups the `clicks` table in addition to the `user` attribute also on an [hourly tumbling window](../sql.html#group-windows) before it counts the number of URLs (time-based computations such as windows are based on special [time attributes](time_attributes.html), which are discussed later.). Again, the figure shows the input and output at different points in time to visualize the changing nature of dynamic tables.
+
+<center>
+<img alt="Continuous Group-Window Query" src="{{ site.baseurl }}/fig/table-streaming/query-groupBy-window-cnt.png" width="100%">
+</center>
+
+As before, the input table `clicks` is shown on the left. The query continuously computes results every hour and updates the result table. The clicks table contains four rows with timestamps (`cTime`) between `12:00:00` and `12:59:59`. The query computes two results rows from this input (one for each `user`) and appends them to the result table. For the next window between `13:00:00` and `13:59:59`, the `clicks` table contains three rows, which results in another two rows being appended to the result table. The result table is updated, as more rows are appended to `clicks` over time.
+
+### Update and Append Queries
+
+Although the two example queries appear to be quite similar (both compute a grouped count aggregate), they differ in one important aspect:
+- The first query updates previously emitted results, i.e., the changelog stream that defines the result table contains `INSERT` and `UPDATE` changes.
+- The second query only appends to the result table, i.e., the changelog stream of the result table only consists of `INSERT` changes.
+
+Whether a query produces an append-only table or an updated table has some implications:
+- Queries that produce update changes usually have to maintain more state (see the following section).
+- The conversion of an append-only table into a stream is different from the conversion of an updated table (see the [Table to Stream Conversion](#table-to-stream-conversion) section).
+
+### Query Restrictions
+
+Many, but not all, semantically valid queries can be evaluated as continuous queries on streams. Some queries are too expensive to compute, either due to the size of state that they need to maintain or because computing updates is too expensive.
+
+- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows in order to be able to update them. For instance, the first example query needs to store the URL count for each user to be able to increase the count and sent out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and might eventually cause the query to fail.
+
+{% highlight sql %}
+SELECT user, COUNT(url)
+FROM clicks
+GROUP BY user;
+{% endhighlight %}
+
+- **Computing Updates:** Some queries require to recompute and update a large fraction of the emitted result rows even if only a single input record is added or updated. Clearly, such queries are not well suited to be executed as continuous queries. An example is the following query which computes for each user a `RANK` based on the time of the last click. As soon as the `clicks` table receives a new row, the `lastAction` of the user is updated and a new rank must be computed. However since two rows cannot have the same rank, all lower ranked rows need to be updated as well.
+
+{% highlight sql %}
+SELECT user, RANK() OVER (ORDER BY lastLogin)
+FROM (
+  SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
+);
+{% endhighlight %}
+
+The [Query Configuration](query_configuration.html) page discusses parameters to control the execution of continuous queries. Some parameters can be used to trade the size of maintained state for result accuracy.
+
+Table to Stream Conversion
+--------------------------
+
+A dynamic table can be continuously modified by `INSERT`, `UPDATE`, and `DELETE` changes just like a regular database table. It might be a table with a single row, which is constantly updated, an insert-only table without `UPDATE` and `DELETE` modifications, or anything in between.
+
+When converting a dynamic table into a stream or writing it to an external system, these changes need to be encoded. Flink's Table API and SQL support three ways to encode the changes of a dynamic table:
+
+* **Append-only stream:** A dynamic table that is only modified by `INSERT` changes can be  converted into a stream by emitting the inserted rows.
+
+* **Retract stream:** A retract stream is a stream with two types of messages, *add messages* and *retract messages*. A dynamic table is converted into an retract stream by encoding an `INSERT` change as add message, a `DELETE` change as retract message, and an `UPDATE` change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.
+
+<center>
+<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/undo-redo-mode.png" width="85%">
+</center>
+<br><br>
+
+* **Upsert stream:** An upsert stream is a stream with two types of messages, *upsert messages* and *delete message*. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a dynamic table by encoding `INSERT` and `UPDATE` changes as upsert message and `DELETE` changes as delete message. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that `UPDATE` changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.
+
+<center>
+<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/redo-mode.png" width="85%">
+</center>
+<br><br>
+
+The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts](../common.html#convert-a-table-into-a-datastream) page. Please note that only append and retract streams are supported when converting a dynamic table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external system are discussed on the [TableSources and TableSinks](../sourceSinks.html#define-a-tablesink) page.
+
+{% top %}
diff --git a/docs/dev/table/streaming/index.md b/docs/dev/table/streaming/index.md
new file mode 100644
index 00000000000..1d8821d408c
--- /dev/null
+++ b/docs/dev/table/streaming/index.md
@@ -0,0 +1,42 @@
+---
+title: "Streaming Concepts"
+nav-id: streaming_tableapi
+nav-parent_id: tableapi
+nav-pos: 10
+is_beta: false
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink's [Table API](../tableApi.html) and [SQL support](../sql.html) are unified APIs for batch and stream processing.
+This means that Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input.
+Because the relational algebra and SQL were originally designed for batch processing,
+relational queries on unbounded streaming input are not as well understood as relational queries on bounded batch input.
+
+Where to go next?
+-----------------
+
+In the following pages, we explain concepts, practical limitations, and stream-specific configuration parameters of Flink's relational APIs on streaming data.
+
+* [Dynamic Tables]({{ site.baseurl }}/dev/table/streaming/dynamic_tables.html): Describes the concept of Dynamic Tables.
+* [Time attributes]({{ site.baseurl }}/dev/table/streaming/time_attributes.html): How time attributes are handled in Table API & SQL.
+* [Joins in Continuous Queries]({{ site.baseurl }}/dev/table/streaming/joins.html): Different supported types of Joins in Continuous Queries.
+* [Temporal Tables]({{ site.baseurl }}/dev/table/streaming/temporal_tables.html): Describes the Temporal Table concept.
+* [Query configuration]({{ site.baseurl }}/dev/table/streaming/query_configuration.html): Lists Table API & SQL specific configuration options.
diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md
new file mode 100644
index 00000000000..cd32bce8097
--- /dev/null
+++ b/docs/dev/table/streaming/joins.md
@@ -0,0 +1,104 @@
+---
+title: "Joins in Continuous Queries"
+nav-parent_id: streaming_tableapi
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+When we have two tables that we want to connect such operation can usually be expressed via some kind of join.
+In batch processing joins can be efficiently executed, since we are working on a bounded completed data sets.
+In stream processing things are a little bit more complicated,
+especially when it comes to the issue how to handle that data can change over time.
+Because of that, there are a couple of ways to actually perform the join using either Table API or SQL.
+
+For more information regarding the syntax please check the Joins sections in [Table API](../tableApi.html#joins) and [SQL](../sql.html#joins).
+
+* This will be replaced by the TOC
+{:toc}
+
+Regular Joins
+-------------
+
+This is the most basic case in which any new records or changes to either side of the join input are visible
+and are affecting the whole join result.
+For example, if there is a new record on the left side,
+it will be joined with all of the previous and future records on the other side.
+
+These semantics have an important implication:
+it requires to keep both sides of the join input in the state indefinitely
+and resource usage will grow indefinitely as well,
+if one or both input tables are continuously growing.
+
+Example:
+{% highlight sql %}
+SELECT * FROM Orders
+INNER JOIN Product
+ON Orders.productId = Product.id
+{% endhighlight %}
+
+Time-windowed Joins
+-------------------
+
+A time-windowed join is defined by a join predicate,
+that checks if [the time attributes](time_attributes.html) of the input records are within a time-window.
+Since time attributes are quasi-monontic increasing,
+Flink can remove old values from the state without affecting the correctness of the result.
+
+Example:
+{% highlight sql %}
+SELECT *
+FROM
+  Orders o,
+  Shipments s
+WHERE o.id = s.orderId AND
+      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+Join with a Temporal Table
+--------------------------
+
+A Temporal Table Join joins an append-only table (left input/probe side) with a [Temporal Tables](temporal_tables.html) (right input/build side),
+i.e. a table that changes over time and tracks its changes.
+For each record from the probe side, it will be joined only with the latest version of the build side.
+That means (in contrast to [Regular Joins](#regular-joins)) if there is a new record on the build side,
+it will not affect the previous results of the join.
+This again allow Flink to limit the number of elements that must be kept on the state.
+In order to support updates (overwrites) of previous values on the build side table, this table must define a primary key.
+
+Compared to [Time-windowed Joins](#time-windowed-joins),
+Temporal Table Joins are not defining a time window within which bounds the records will be joined.
+Records from the probe side are joined with the most recent versions of the build side and records on the build side might be arbitrary old.
+As time passes the previous, no longer needed, versions of the record (for the given primary key) will be removed from the state.
+
+Such behaviour makes temporal table join a good candidate to express stream enrichment.
+
+Example:
+{% highlight sql %}
+SELECT
+  o.amount * r.rate AS amount
+FROM
+  Orders AS o,
+  LATERAL TABLE (Rates(o.proctime)) AS r
+WHERE r.currency = o.currency
+{% endhighlight %}
+
+For more information about this concept please check [Temporal Tables](temporal_tables.html) page.
+
+
diff --git a/docs/dev/table/streaming/query_configuration.md b/docs/dev/table/streaming/query_configuration.md
new file mode 100644
index 00000000000..87640b2f49c
--- /dev/null
+++ b/docs/dev/table/streaming/query_configuration.md
@@ -0,0 +1,130 @@
+---
+title: "Query Configuration"
+nav-parent_id: streaming_tableapi
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. In many cases, continuous queries on streaming input are capable of computing accurate results that are identical to offline computed results. However, this is not possible in general case because continuous queries have to restrict the size of the state they are maintaining in order to avoid to run out of storage and to be able to process unbounded streaming data over a long period of time. As a result, a continuous query might only be able to provide approximated results depending on the characteristics of the input data and the query itself.
+
+Flink's Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a `QueryConfig` object. The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table` is translated, i.e., when it is [transformed into a DataStream](../common.html#convert-a-table-into-a-datastream-or-dataset) or [emitted via a TableSink](../common.html#emit-a-table).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// obtain query configuration from TableEnvironment
+StreamQueryConfig qConfig = tableEnv.queryConfig();
+// set query parameters
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
+
+// define query
+Table result = ...
+
+// create TableSink
+TableSink<Row> sink = ...
+
+// emit result Table via a TableSink
+result.writeToSink(sink, qConfig);
+
+// convert result Table into a DataStream<Row>
+DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// obtain query configuration from TableEnvironment
+val qConfig: StreamQueryConfig = tableEnv.queryConfig
+// set query parameters
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
+
+// define query
+val result: Table = ???
+
+// create TableSink
+val sink: TableSink[Row] = ???
+
+// emit result Table via a TableSink
+result.writeToSink(sink, qConfig)
+
+// convert result Table into a DataStream[Row]
+val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
+
+{% endhighlight %}
+</div>
+</div>
+
+In the following we describe the parameters of the `QueryConfig` and how they affect the accuracy and resource consumption of a query.
+
+Idle State Retention Time
+-------------------------
+
+Many queries aggregate or join records on one or more key attributes. When such a query is executed on a stream, the continuous query needs to collect records or maintain partial results per key. If the key domain of the input stream is evolving, i.e., the active key values are changing over time, the continuous query accumulates more and more state as more and more distinct keys are observed. However, often keys become inactive after some time and their corresponding state becomes stale and useless.
+
+For example the following query computes the number of clicks per session.
+
+{% highlight sql %}
+SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+{% endhighlight %}
+
+The `sessionId` attribute is used as a grouping key and the continuous query maintains a count for each `sessionId` it observes. The `sessionId` attribute is evolving over time and `sessionId` values are only active until the session ends, i.e., for a limited period of time. However, the continuous query cannot know about this property of `sessionId` and expects that every `sessionId` value can occur at any point of time. It maintains a count for each observed `sessionId` value. Consequently, the total state size of the query is continuously growing as more and more `sessionId` values are observed.
+
+The *Idle State Retention Time* parameters define for how long the state of a key is retained without being updated before it is removed. For the previous example query, the count of a `sessionId` would be removed as soon as it has not been updated for the configured period of time.
+
+By removing the state of a key, the continuous query completely forgets that it has seen this key before. If a record with a key, whose state has been removed before, is processed, the record will be treated as if it was the first record with the respective key. For the example above this means that the count of a `sessionId` would start again at `0`.
+
+There are two parameters to configure the idle state retention time:
+- The *minimum idle state retention time* defines how long the state of an inactive key is at least kept before it is removed.
+- The *maximum idle state retention time* defines how long the state of an inactive key is at most kept before it is removed.
+
+The parameters are specified as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+StreamQueryConfig qConfig = ...
+
+// set idle state retention time: min = 12 hours, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val qConfig: StreamQueryConfig = ???
+
+// set idle state retention time: min = 12 hours, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
+
+{% endhighlight %}
+</div>
+</div>
+
+Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
+
+{% top %}
diff --git a/docs/dev/table/streaming/temporal_tables.md b/docs/dev/table/streaming/temporal_tables.md
new file mode 100644
index 00000000000..e6853e9a7ba
--- /dev/null
+++ b/docs/dev/table/streaming/temporal_tables.md
@@ -0,0 +1,286 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Temporal Tables represent a concept of a table that changes over time.
+Flink can keep track of those changes and allows for accessing the table's content at a certain point in time within a query
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+----------
+
+Let's assume that we have the following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+======= ====== =========
+10:15        2 Euro
+10:30        1 US Dollar
+10:32       50 Yen
+10:52        3 Euro
+11:04        5 US Dollar
+{% endhighlight %}
+
+`Orders` is an append-only table that represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+======= ======== ======
+09:00   US Dollar   102
+09:00   Euro        114
+09:00   Yen           1
+10:45   Euro        116
+11:15   Euro        119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+======= ====== =========
+10:15        2 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query,
+speed up it's execution and reduce state usage.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+------------------------
+
+In order to access the data in the Temporal Table,
+one must pass a time attribute that determines the version of the table that will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+======= ======== ======
+09:00   US Dollar   102
+09:00   Euro        114
+09:00   Yen           1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+======= ======== ======
+09:00   US Dollar   102
+10:45   Euro        116
+09:00   Yen           1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns.
+
+Processing time
+---------------
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.functions.TemporalTableFunction;
+(...)
+
+// Get the stream and table environments.
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+// Provide static data set of orders table.
+List<Tuple2<Long, String>> ordersData = new ArrayList<>();
+ordersData.add(Tuple2.of(2L, "Euro"));
+ordersData.add(Tuple2.of(1L, "US Dollar"));
+ordersData.add(Tuple2.of(50L, "Yen"));
+ordersData.add(Tuple2.of(3L, "Euro"));
+ordersData.add(Tuple2.of(5L, "US Dollar"));
+
+// Provide static data set of rates history table.
+List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
+ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
+ratesHistoryData.add(Tuple2.of("Euro", 114L));
+ratesHistoryData.add(Tuple2.of("Yen", 1L));
+ratesHistoryData.add(Tuple2.of("Euro", 116L));
+ratesHistoryData.add(Tuple2.of("Euro", 119L));
+
+// Create and register example tables using above data sets.
+// In the real setup, you should replace this with your own tables.
+DataStream<Tuple2<Long, String>> ordersStream = env.fromCollection(ordersData);
+Table orders = tEnv.fromDataStream(ordersStream, "o_amount, o_currency, o_proctime.proctime");
+
+DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
+Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");
+
+tEnv.registerTable("Orders", orders);
+tEnv.registerTable("RatesHistory", ratesHistory);
+
+// Create and register TemporalTableFunction. It will used "r_proctime" as the time attribute
+// and "r_currency" as the primary key.
+TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
+tEnv.registerFunction("Rates", rates);                                                              // <==== (2)
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Get the stream and table environments.
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+// Provide static data set of orders table.
+val ordersData = new mutable.MutableList[(Long, String)]
+ordersData.+=((2L, "Euro"))
+ordersData.+=((1L, "US Dollar"))
+ordersData.+=((50L, "Yen"))
+ordersData.+=((3L, "Euro"))
+ordersData.+=((5L, "US Dollar"))
+
+// Provide static data set of rates history table.
+val ratesHistoryData = new mutable.MutableList[(String, Long)]
+ratesHistoryData.+=(("US Dollar", 102L))
+ratesHistoryData.+=(("Euro", 114L))
+ratesHistoryData.+=(("Yen", 1L))
+ratesHistoryData.+=(("Euro", 116L))
+ratesHistoryData.+=(("Euro", 119L))
+
+// Create and register example tables using above data sets.
+// In the real setup, you should replace this with your own tables.
+val orders = env
+  .fromCollection(ordersData)
+  .toTable(tEnv, 'o_amount, 'o_currency, 'o_proctime.proctime)
+val ratesHistory = env
+  .fromCollection(ratesHistoryData)
+  .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
+
+tEnv.registerTable("Orders", orders)
+tEnv.registerTable("RatesHistory", ratesHistory)
+
+// Create and register TemporalTableFunction. It will used "r_proctime" as the time attribute
+// and "r_currency" as the primary key.
+val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1)
+tEnv.registerFunction("Rates", rates)                                          // <==== (2)
+{% endhighlight %}
+</div>
+</div>
+
+In the line `(1)` we created a `rates` [Temporal Table Function](#temporal-table-functions).
+This allows us to use `rates` function in Table API.
+Line `(2)` registers this function under `Rates` name in our table environment,
+which allows us to use `Rates` function in SQL.
+
+### Joining with Temporal Table Function
+
+After [defining Temporal Table Function](#defining-temporal-table-function) we can start using it.
+Temporal Table Functions can be used in the same way how normal Table Functions would be used.
+For example to solve our motivating problem of converting currencies from `Orders` table,
+we could:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT
+  SUM(o_amount * r_rate) AS amount
+FROM
+  Orders,
+  LATERAL TABLE (Rates(o_proctime))
+WHERE
+  r_currency = o_currency
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Table result = orders
+    .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
+    .select("o_amount * r_rate").as("amount")
+    .sum("amount")
+    .toAppendStream<Row>()
+    .print();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val result = orders
+    .join(rates('o_proctime), 'r_currency === 'o_currency)
+    .select('o_amount * 'r_rate).as('amount)
+    .sum('amount)
+    .toAppendStream[Row]
+    .print
+{% endhighlight %}
+</div>
+</div>
+
+With processing time it is impossible to pass "past" time attributes as an argument to the Temporal Table Function.
+By definition it is always a current timestamp.
+Thus processing time Temporal Table Function invocations will always return the latest known versions of the underlying table
+and any updates in the underlying history table will also immediately overwrite the current values.
+Those new updates will have no effect on the previously results emitted/processed records from the probe side.
+
+One can think about processing time Temporal Join as a simple `HashMap<K, V>`
+that stores all of the records from the build side.
+When a new record from the build side have the same key as some previous record,
+the old value is just simply overwritten.
+Every record from the probe side is always evaluated against the most recent/current state of the `HashMap`.
+
+#### Resource usage
+
+Only the latest version (with respect to the defined primary key) of the build side records are being kept on the state.
diff --git a/docs/dev/table/streaming/time_attributes.md b/docs/dev/table/streaming/time_attributes.md
new file mode 100644
index 00000000000..0af57ba051b
--- /dev/null
+++ b/docs/dev/table/streaming/time_attributes.md
@@ -0,0 +1,328 @@
+---
+title: "Time Attributes"
+nav-parent_id: streaming_tableapi
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink is able to process streaming data based on different notions of *time*.
+
+- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps which are attached to each row. The timestamps can encode when an event happened.
+- *Ingestion time* is the time that events enter Flink; internally, it is treated similarly to event time.
+
+For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
+
+Table programs require that the corresponding time characteristic has been specified for the streaming environment:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
+
+// alternatively:
+// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
+
+// alternatively:
+// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
+</div>
+
+Time-based operations such as windows in both the [Table API]({{ site.baseurl }}/dev/table/tableApi.html#group-windows) and [SQL]({{ site.baseurl }}/dev/table/sql.html#group-windows) require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
+
+Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or are pre-defined when using a `TableSource`. Once a time attribute has been defined at the beginning, it can be referenced as a field and can used in time-based operations.
+
+As long as a time attribute is not modified and is simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and thus can not be used for time-based operations anymore.
+
+Processing time
+---------------
+
+Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It neither requires timestamp extraction nor watermark generation.
+
+There are two ways to define a processing time attribute.
+
+### During DataStream-to-Table Conversion
+
+The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, String>> stream = ...;
+
+// declare an additional logical field as a processing time attribute
+Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
+
+WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val stream: DataStream[(String, String)] = ...
+
+// declare an additional logical field as a processing time attribute
+val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
+
+val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
+{% endhighlight %}
+</div>
+</div>
+
+### Using a TableSource
+
+The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// define a table source with a processing attribute
+public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
+
+	@Override
+	public TypeInformation<Row> getReturnType() {
+		String[] names = new String[] {"Username" , "Data"};
+		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
+		return Types.ROW(names, types);
+	}
+
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+		// create stream
+		DataStream<Row> stream = ...;
+		return stream;
+	}
+
+	@Override
+	public String getProctimeAttribute() {
+		// field with this name will be appended as a third field
+		return "UserActionTime";
+	}
+}
+
+// register table source
+tEnv.registerTableSource("UserActions", new UserActionSource());
+
+WindowedTable windowedTable = tEnv
+	.scan("UserActions")
+	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// define a table source with a processing attribute
+class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
+
+	override def getReturnType = {
+		val names = Array[String]("Username" , "Data")
+		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
+		Types.ROW(names, types)
+	}
+
+	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+		// create stream
+		val stream = ...
+		stream
+	}
+
+	override def getProctimeAttribute = {
+		// field with this name will be appended as a third field
+		"UserActionTime"
+	}
+}
+
+// register table source
+tEnv.registerTableSource("UserActions", new UserActionSource)
+
+val windowedTable = tEnv
+	.scan("UserActions")
+	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
+{% endhighlight %}
+</div>
+</div>
+
+Event time
+----------
+
+Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage.
+
+Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular field of a record in a batch environment.
+
+In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
+
+An event time attribute can be defined either during DataStream-to-Table conversion or by using a TableSource.
+
+### During DataStream-to-Table Conversion
+
+The event time attribute is defined with the `.rowtime` property during schema definition. [Timestamps and watermarks]({{ site.baseurl }}/dev/event_time.html) must have been assigned in the `DataStream` that is converted.
+
+There are two ways of defining the time attribute when converting a `DataStream` into a `Table`. Depending on whether the specified `.rowtime` field name exists in the schema of the `DataStream` or not, the timestamp field is either
+
+- appended as a new field to the schema or
+- replaces an existing field.
+
+In either case the event time timestamp field will hold the value of the `DataStream` event time timestamp.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// Option 1:
+
+// extract timestamp and assign watermarks based on knowledge of the stream
+DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
+
+// declare an additional logical field as an event time attribute
+Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
+
+
+// Option 2:
+
+// extract timestamp from first field, and assign watermarks based on knowledge of the stream
+DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
+
+// the first field has been used for timestamp extraction, and is no longer necessary
+// replace first field with a logical event time attribute
+Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
+
+// Usage:
+
+WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// Option 1:
+
+// extract timestamp and assign watermarks based on knowledge of the stream
+val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
+
+// declare an additional logical field as an event time attribute
+val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
+
+
+// Option 2:
+
+// extract timestamp from first field, and assign watermarks based on knowledge of the stream
+val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
+
+// the first field has been used for timestamp extraction, and is no longer necessary
+// replace first field with a logical event time attribute
+val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
+
+// Usage:
+
+val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
+{% endhighlight %}
+</div>
+</div>
+
+### Using a TableSource
+
+The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The `getRowtimeAttribute()` method returns the name of an existing field that carries the event time attribute of the table and is of type `LONG` or `TIMESTAMP`.
+
+Moreover, the `DataStream` returned by the `getDataStream()` method must have watermarks assigned that are aligned with the defined time attribute. Please note that the timestamps of the `DataStream` (the ones which are assigned by a `TimestampAssigner`) are ignored. Only the values of the `TableSource`'s rowtime attribute are relevant.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// define a table source with a rowtime attribute
+public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
+
+	@Override
+	public TypeInformation<Row> getReturnType() {
+		String[] names = new String[] {"Username", "Data", "UserActionTime"};
+		TypeInformation[] types =
+		    new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
+		return Types.ROW(names, types);
+	}
+
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+		// create stream
+		// ...
+		// assign watermarks based on the "UserActionTime" attribute
+		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
+		return stream;
+	}
+
+	@Override
+	public String getRowtimeAttribute() {
+		// Mark the "UserActionTime" attribute as event-time attribute.
+		return "UserActionTime";
+	}
+}
+
+// register the table source
+tEnv.registerTableSource("UserActions", new UserActionSource());
+
+WindowedTable windowedTable = tEnv
+	.scan("UserActions")
+	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// define a table source with a rowtime attribute
+class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
+
+	override def getReturnType = {
+		val names = Array[String]("Username" , "Data", "UserActionTime")
+		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
+		Types.ROW(names, types)
+	}
+
+	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+		// create stream
+		// ...
+		// assign watermarks based on the "UserActionTime" attribute
+		val stream = inputStream.assignTimestampsAndWatermarks(...)
+		stream
+	}
+
+	override def getRowtimeAttribute = {
+		// Mark the "UserActionTime" attribute as event-time attribute.
+		"UserActionTime"
+	}
+}
+
+// register the table source
+tEnv.registerTableSource("UserActions", new UserActionSource)
+
+val windowedTable = tEnv
+	.scan("UserActions")
+	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f50b0f54554..821471daaf2 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -24,9 +24,9 @@ under the License.
 
 The Table API is a unified, relational API for stream and batch processing. Table API queries can be run on batch or streaming input without modifications. The Table API is a super set of the SQL language and is specially designed for working with Apache Flink. The Table API is a language-integrated API for Scala and Java. Instead of specifying queries as String values as common with SQL, Table API queries are defined in a language-embedded style in Java or Scala with IDE support like autocompletion and syntax validation. 
 
-The Table API shares many concepts and parts of its API with Flink's SQL integration. Have a look at the [Common Concepts & API]({{ site.baseurl }}/dev/table/common.html) to learn how to register tables or to create a `Table` object. The [Streaming Concepts]({{ site.baseurl }}/dev/table/streaming.html) page discusses streaming specific concepts such as dynamic tables and time attributes.
+The Table API shares many concepts and parts of its API with Flink's SQL integration. Have a look at the [Common Concepts & API]({{ site.baseurl }}/dev/table/common.html) to learn how to register tables or to create a `Table` object. The [Streaming Concepts](./streaming) pages discuss streaming specific concepts such as dynamic tables and time attributes.
 
-The following examples assume a registered table called `Orders` with attributes `(a, b, c, rowtime)`. The `rowtime` field is either a logical [time attribute](streaming.html#time-attributes) in streaming or a regular timestamp field in batch.
+The following examples assume a registered table called `Orders` with attributes `(a, b, c, rowtime)`. The `rowtime` field is either a logical [time attribute](streaming/time_attributes.html) in streaming or a regular timestamp field in batch.
 
 * This will be replaced by the TOC
 {:toc}
@@ -137,7 +137,7 @@ val result: Table = orders
 </div>
 </div>
 
-Since the Table API is a unified API for batch and streaming data, both example programs can be executed on batch and streaming inputs without any modification of the table program itself. In both cases, the program produces the same results given that streaming records are not late (see [Streaming Concepts](streaming.html) for details).
+Since the Table API is a unified API for batch and streaming data, both example programs can be executed on batch and streaming inputs without any modification of the table program itself. In both cases, the program produces the same results given that streaming records are not late (see [Streaming Concepts](streaming) for details).
 
 {% top %}
 
@@ -329,7 +329,7 @@ val result = orders.where('b === "red")
 Table orders = tableEnv.scan("Orders");
 Table result = orders.groupBy("a").select("a, b.sum as d");
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -367,7 +367,7 @@ Table result = orders
       .as("w"))
     .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate
 {% endhighlight %}
-       <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a>.</p>
+       <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming/time_attributes.html">time attribute</a>.</p>
       </td>
     </tr>
     <tr>
@@ -405,7 +405,7 @@ Table orders = tEnv.scan("Orders");
 tEnv.registerFunction("myUdagg", new MyUdagg());
 orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -420,7 +420,7 @@ orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctRes
 Table orders = tableEnv.scan("Orders");
 Table result = orders.distinct();
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
   </tbody>
@@ -450,7 +450,7 @@ Table result = orders.distinct();
 val orders: Table = tableEnv.scan("Orders")
 val result = orders.groupBy('a).select('a, 'b.sum as 'd)
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -488,7 +488,7 @@ val result: Table = orders
       as 'w)
     .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate
 {% endhighlight %}
-       <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a>.</p>
+       <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming/time_attributes.html">time attribute</a>.</p>
       </td>
     </tr>
     <tr>
@@ -526,7 +526,7 @@ val orders: Table = tEnv.scan("Orders");
 val myUdagg = new MyUdagg();
 orders.groupBy('users).select('users, myUdagg.distinct('points) as 'myDistinctResult);
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -540,7 +540,7 @@ orders.groupBy('users).select('users, myUdagg.distinct('points) as 'myDistinctRe
 val orders: Table = tableEnv.scan("Orders")
 val result = orders.distinct()
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
   </tbody>
@@ -576,7 +576,7 @@ Table left = tableEnv.fromDataSet(ds1, "a, b, c");
 Table right = tableEnv.fromDataSet(ds2, "d, e, f");
 Table result = left.join(right).where("a = d").select("a, b, e");
 {% endhighlight %}
-<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
 
@@ -597,7 +597,7 @@ Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
 Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
 Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
 {% endhighlight %}
-<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -608,7 +608,7 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a single equality predicate that compares <a href="streaming/time_attributes.html">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
         <p>For example, the following predicates are valid window join conditions:</p>
 
         <ul>
@@ -669,6 +669,31 @@ Table result = orders
 {% endhighlight %}
       </td>
     </tr>
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><a href="streaming/temporal_tables.html">Temporal Tables</a> are tables that track changes over time.
+        A <a href="streaming/temporal_tables.html#temporal-table-functions">Temporal Table Function</a> provides access to the state of a temporal table at a specific point in time.
+        The syntax to join a table with a Temporal Table Function is the same as in Join with Table Functions.</p>
+
+        <p>Currently only inner joins with temporal tables are supported.</p>
+{% highlight java %}
+Table ratesHistory = tableEnv.scan("RatesHistory");
+// register temporal table function
+TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
+tableEnv.registerFunction("rates", rates);
+
+// join
+Table orders = tableEnv.scan("Orders");
+Table result = orders
+    .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
+{% endhighlight %}
+        <p>For more information please check the more detailed <a href="streaming/temporal_tables.html">Temporal Tables concept description.</a></p>
+      </td>
+    </tr>
 
   </tbody>
 </table>
@@ -698,7 +723,7 @@ val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
 val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
 val result = left.join(right).where('a === 'd).select('a, 'b, 'e)
 {% endhighlight %}
-<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -718,7 +743,7 @@ val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
 val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
 val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
 {% endhighlight %}
-<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -729,7 +754,7 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a single equality predicate that compares <a href="streaming/time_attributes.html">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
         <p>For example, the following predicates are valid window join conditions:</p>
 
         <ul>
@@ -786,6 +811,31 @@ val result: Table = table
       </td>
     </tr>
 
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><a href="streaming/temporal_tables.html">Temporal Tables</a> are tables that track their changes over time.
+        A <a href="streaming/temporal_tables.html#temporal-table-functions">Temporal Table Function</a> provides access to the state of a temporal table at a specific point in time.
+        The syntax to join a table with a Temporal Table Function is the same as in Join with Table Functions.</p>
+
+        <p>Currently only inner joins with temporal tables are supported.</p>
+{% highlight scala %}
+val ratesHistory = tableEnv.scan("RatesHistory")
+// register temporal table function
+val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency)
+
+// join
+val orders = tableEnv.scan("Orders")
+val result = orders
+    .join(rates('o_rowtime), 'r_currency === 'o_currency)
+{% endhighlight %}
+        <p>For more information please check the more detailed <a href="streaming/temporal_tables.html">Temporal Tables concept description.</a></p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -915,7 +965,7 @@ tableEnv.registerTable("RightTable", right);
 Table result = left.select("a, b, c").where("a.in(RightTable)");
 {% endhighlight %}
 
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
   </tbody>
@@ -1035,7 +1085,7 @@ val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
 val right = ds2.toTable(tableEnv, 'a)
 val result = left.select('a, 'b, 'c).where('a.in(right))
 {% endhighlight %}
-        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
       </td>
     </tr>
 
@@ -1310,7 +1360,7 @@ Tumbling windows are defined by using the `Tumble` class as follows:
     </tr>
     <tr>
       <td><code>on</code></td>
-      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming.html#time-attributes">declared event-time or processing-time time attribute</a>.</td>
+      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming/time_attributes.html">declared event-time or processing-time time attribute</a>.</td>
     </tr>
     <tr>
       <td><code>as</code></td>
@@ -1372,7 +1422,7 @@ Sliding windows are defined by using the `Slide` class as follows:
     </tr>
     <tr>
       <td><code>on</code></td>
-      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming.html#time-attributes">declared event-time or processing-time time attribute</a>.</td>
+      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming/time_attributes.html">declared event-time or processing-time time attribute</a>.</td>
     </tr>
     <tr>
       <td><code>as</code></td>
@@ -1430,7 +1480,7 @@ A session window is defined by using the `Session` class as follows:
     </tr>
     <tr>
       <td><code>on</code></td>
-      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming.html#time-attributes">declared event-time or processing-time time attribute</a>.</td>
+      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming/time_attributes.html">declared event-time or processing-time time attribute</a>.</td>
     </tr>
     <tr>
       <td><code>as</code></td>
@@ -1514,7 +1564,7 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
       <td>
         <p>Defines the order of rows within each partition and thereby the order in which the aggregate functions are applied to rows.</p>
 
-        <p><b>Note:</b> For streaming queries this must be a <a href="streaming.html#time-attributes">declared event-time or processing-time time attribute</a>. Currently, only a single sort attribute is supported.</p>
+        <p><b>Note:</b> For streaming queries this must be a <a href="streaming/time_attributes.html">declared event-time or processing-time time attribute</a>. Currently, only a single sort attribute is supported.</p>
       </td>
     </tr>
     <tr>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> -----------------------------------------------
>
>                 Key: FLINK-9712
>                 URL: https://issues.apache.org/jira/browse/FLINK-9712
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Mime
View raw message