flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-6747] [docs] Add documentation for dynamic tables.
Date Fri, 21 Jul 2017 23:25:58 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 20ddb46b7 -> 6856875e7

[FLINK-6747] [docs] Add documentation for dynamic tables.

This closes #4365

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6856875e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6856875e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6856875e

Branch: refs/heads/release-1.3
Commit: 6856875e7691c55b0ce14bf90dfa687eb15cb539
Parents: 20ddb46
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Jul 17 19:11:04 2017 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Sat Jul 22 01:24:54 2017 +0200

 docs/dev/table/streaming.md                     | 161 ++++++++++++++++++-
 docs/fig/table-streaming/append-mode.png        | Bin 0 -> 124294 bytes
 docs/fig/table-streaming/query-groupBy-cnt.png  | Bin 0 -> 314119 bytes
 .../query-groupBy-window-cnt.png                | Bin 0 -> 199277 bytes
 docs/fig/table-streaming/redo-mode.png          | Bin 0 -> 92415 bytes
 .../fig/table-streaming/stream-query-stream.png | Bin 0 -> 99139 bytes
 docs/fig/table-streaming/undo-redo-mode.png     | Bin 0 -> 81057 bytes
 7 files changed, 153 insertions(+), 8 deletions(-)

diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index 1677dad..cb77af8 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -22,21 +22,166 @@ specific language governing permissions and limitations
 under the License.
-**TO BE DONE:** Intro
+Flink's [Table API](tableApi.html) and [SQL support](sql.html) are unified APIs for batch
and stream processing. This means that Table API and SQL queries have the same semantics regardless
whether their input is bounded batch input or unbounded stream input. Because the relational
algebra and SQL were originally designed for batch processing, relational queries on unbounded
streaming input are not as well understood as relational queries on bounded batch input. 
+On this page, we explain concepts, practical limitations, and stream-specific configuration
parameters of Flink's relational APIs on streaming data. 
 * This will be replaced by the TOC
-Dynamic Table
+Relational Queries on Data Streams
+SQL and the relational algebra have not been designed with streaming data in mind. As a consequence,
there are few conceptual gaps between relational algebra (and SQL) and stream processing.
+<table class="table table-bordered">
+	<tr>
+		<th>Relational Algebra / SQL</th>
+		<th>Stream Processing</th>
+	</tr>
+	<tr>
+		<td>Relations (or tables) are bounded (multi-)sets of tuples.</td>
+		<td>A stream is an infinite sequences of tuples.</td>
+	</tr>
+	<tr>
+		<td>A query that is executed on batch data (e.g., a table in a relational database)
has access to the complete input data.</td>
+		<td>A streaming query cannot access all data when is started and has to "wait" for
data to be streamed in.</td>
+	</tr>
+	<tr>
+		<td>A batch query terminates after it produced a fixed sized result.</td>
+		<td>A streaming query continuously updates its result based on the received records
and never completes.</td>
+	</tr>
+Despite these differences, processing streams with relational queries and SQL is not impossible.
Advanced relational database systems offer a feature called *Materialized Views*. A materialized
view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual
view, a materialized view caches the result of the query such that the query does not need
to be evaluated when the view is accessed. A common challenge for caching is to prevent a
cache from serving outdated results. A materialized view becomes outdated when the base tables
of its definition query are modified. *Eager View Maintenance* is a technique to update materialized
views and updates a materialized view as soon as its base tables are updated. 
+The connection between eager view maintenance and SQL queries on streams becomes obvious
if we consider the following:
+- A database table is the result of a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements,
often called *changelog stream*.
+- A materialized view is defined as a SQL query. In order to update the view, the query is
continuously processes the changelog streams of the view's base relations.
+- The materialized view is the result of the streaming SQL query.
+With these points in mind, we introduce Flink's concept of *Dynamic Tables* in the next section.
+Dynamic Tables &amp; Continuous Queries
+*Dynamic tables* are the core concept of Flink's Table API and SQL support for streaming
data. In contrast to the static tables that represent batch data, dynamic table are changing
over time. They can be queried like static batch tables. Querying a dynamic table yields a
*Continuous Query*. A continuous query never terminates and produces a dynamic table as result.
The query continuously updates its (dynamic) result table to reflect the changes on its input
(dynamic) table. Essentially, a continuous query on a dynamic table is very similar to the
definition query of a materialized view. 
+It is important to note that the result of a continuous query is always semantically equivalent
to the result of the same query being executed in batch mode on a snapshot of the input tables.
+The following figure visualizes the relationship of streams, dynamic tables, and  continuous
+<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/stream-query-stream.png"
+1. A stream is converted into a dynamic table.
+1. A continuous query is evaluated on the dynamic table yielding a new dynamic table.
+1. The resulting dynamic table is converted back into a stream.
+**Note:** Dynamic tables are foremost a logical concept. Dynamic tables are not necessarily
(fully) materialized during query execution.
+In the following, we will explain the concepts of dynamic tables and continuous queries with
a stream of click events that have the following schema:
+  user:  VARCHAR,   // the name of the user
+  cTime: TIMESTAMP, // the time when the URL was accessed
+  url:   VARCHAR    // the URL that was accessed by the user
+### Defining a Table on a Stream
+In order to process a stream with a relational query, it has to be converted into a `Table`.
Conceptually, each record of the stream is interpreted as an `INSERT` modification on the
resulting table. Essentially, we are building a table from an `INSERT`-only changelog stream.
+The following figure visualizes how the stream of click event (left-hand side) is converted
into a table (right-hand side). The resulting table is continuously growing as more records
of the click stream are inserted.
+<img alt="Append mode" src="{{ site.baseurl }}/fig/table-streaming/append-mode.png" width="60%">
+**Note:** A table which is defined on a stream is internally not materialized. 
+### Continuous Queries
+A continuous query is evaluated on a dynamic table and produces a new dynamic table as result.
In contrast to a batch query, a continuous query never terminates and updates its result table
according to the updates on its input tables. At any point in time, the result of a continuous
query is semantically equivalent to the result of the same query being executed in batch mode
on a snapshot of the input tables. 
+In the following we show two example queries on a `clicks` table that is defined on the stream
of click events.
+The first query is a simple `GROUP-BY COUNT` aggregation query. It groups the `clicks` table
on the `user` field and counts the number of visited URLs. The following figure shows how
the query is evaluated over time as the `clicks` table is updated with additional rows.
+<img alt="Continuous Non-Windowed Query" src="{{ site.baseurl }}/fig/table-streaming/query-groupBy-cnt.png"
+When the query is started, the `clicks` table (left-hand side) is empty. The query starts
to compute the result table, when the first row is inserted into the `clicks` table. After
the first row `[Mary, ./home]` was inserted, the result table (right-hand side, top) consists
of a single row `[Mary, 1]`. When the second row `[Bob, ./cart]` is inserted into the `clicks`
table, the query updates the result table and inserts a new row `[Bob, 1]`. The third row
`[Mary, ./prod?id=1]` yields an update of an already computed result row such that `[Mary,
1]` is updated to `[Mary, 2]`. Finally, the query inserts a third row `[Liz, 1]` into the
result table, when the fourth row is appended to the `clicks` table.
+The second query is similar to the first one but groups the `clicks` table in addition to
the `user` attribute also on an [hourly tumbling window](./sql.html#group-windows) before
it counts the number of URLs (time-based computations such as windows are based on special
[time attributes](#time-attributes), which are discussed below.). Again, the figure shows
the input and output at different points in time to visualize the changing nature of dynamic
+<img alt="Continuous Group-Window Query" src="{{ site.baseurl }}/fig/table-streaming/query-groupBy-window-cnt.png"
+As before, the input table `clicks` is shown on the left. The query continuously computes
results every hour and updates the result table. The clicks table contains four rows with
timestamps (`cTime`) between `12:00:00` and `12:59:59`. The query computes two results rows
from this input (one for each `user`) and appends them to the result table. For the next window
between `13:00:00` and `13:59:59`, the `clicks` table contains three rows, which results in
another two rows being appended to the result table. The result table is updated, as more
rows are appended to `clicks` over time.
+#### Update and Append Queries
+Although the two example queries appear to be quite similar (both compute a grouped count
aggregate), they differ in one important aspect: 
+- The first query updates previously emitted results, i.e., the changelog stream that defines
the result table contains `INSERT` and `UPDATE` changes. 
+- The second query only appends to the result table, i.e., the changelog stream of the result
table only consists of `INSERT` changes.
+Whether a query produces an append-only table or an updated table has some implications:
+- Queries that produce update changes usually have to maintain more state (see the following
+- The conversion of an append-only table into a stream is different from the conversion of
an updated table (see the [Table to Stream Conversion](#table-to-stream-conversion) section).

+#### Query Restrictions
+Many, but not all, semantically valid queries can be evaluated as continuous queries on streams.
Some queries are too expensive to compute, either due to the size of state that they need
to maintain or because computing updates is too expensive.
+- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed
to run for weeks or months. Hence, the total amount of data that a continuous query processes
can be very large. Queries that have to update previously emitted results need to maintain
all emitted rows in order to be able to update them. For instance, the first example query
needs to store the URL count for each user to be able to increase the count and sent out a
new result when the input table receives a new row. If only registered users are tracked,
the number of counts to maintain might not be too high. However, if non-registered users get
a unique user name assigned, the number of counts to maintain would grow over time and might
eventually cause the query to fail.
+{% highlight sql %}
+SELECT user, COUNT(url)
+FROM clicks
+GROUP BY user;
+{% endhighlight %}
+- **Computing Updates:** Some queries require to recompute and update a large fraction of
the emitted result rows even if only a single input record is added or updated. Clearly, such
queries are not well suited to be executed as continuous queries. An example is the following
query which computes for each user a `RANK` based on the time of the last click. As soon as
the `clicks` table receives a new row, the `lastAction` of the user is updated and a new rank
must be computed. However since two rows cannot have the same rank, all lower ranked rows
need to be updated as well.
+{% highlight sql %}
+SELECT user, RANK() OVER (ORDER BY lastLogin) 
+  SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
+{% endhighlight %}
+The [QueryConfig](#query-configuration) section discusses parameters to control the execution
of continuous queries. Some parameters can be used to trade the size of maintained state for
result accuracy.
+### Table to Stream Conversion
+A dynamic table can be continuously modified by `INSERT`, `UPDATE`, and `DELETE` changes
just like a regular database table. It might be a table with a single row, which is constantly
updated, an insert-only table without `UPDATE` and `DELETE` modifications, or anything in
+When converting a dynamic table into a stream or writing it to an external system, these
changes need to be encoded. Flink's Table API and SQL support three ways to encode the changes
of a dynamic table:
+* **Append-only stream:** A dynamic table that is only modified by `INSERT` changes can be
 converted into a stream by emitting the inserted rows. 
+* **Retract stream:** A retract stream is a stream with two types of messages, *add messages*
and *retract messages*. A dynamic table is converted into an retract stream by encoding an
`INSERT` change as add message, a `DELETE` change as retract message, and an `UPDATE` change
as a retract message for the updated (previous) row and an add message for the updating (new)
row. The following figure visualizes the conversion of a dynamic table into a retract stream.
+<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/undo-redo-mode.png"
-This section will be reworked soon. Until then, please read the [introductory blog post about
Dynamic Tables](http://flink.apache.org/news/2017/04/04/dynamic-tables.html).
+* **Upsert stream:** An upsert stream is a stream with two types of messages, *upsert messages*
and *delete message*. A dynamic table that is converted into an upsert stream requires a (possibly
composite) unique key. A dynamic table with unique key is converted into a dynamic table by
encoding `INSERT` and `UPDATE` changes as upsert message and `DELETE` changes as delete message.
The stream consuming operator needs to be aware of the unique key attribute in order to apply
messages correctly. The main difference to a retract stream is that `UPDATE` changes are encoded
with a single message and hence more efficient. The following figure visualizes the conversion
of a dynamic table into an upsert stream.
-**TO BE DONE:**
+<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/redo-mode.png" width="85%">
-* Stream -> Table
-* Table -> Stream
-* update changes / retraction
+The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts](./common.html#convert-a-table-into-a-datastream)
page. Please note that only append and retract streams are supported when converting a dynamic
table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external
system are discussed on the [TableSources and TableSinks](./sourceSinks.html#define-a-tablesink)
 {% top %}

diff --git a/docs/fig/table-streaming/append-mode.png b/docs/fig/table-streaming/append-mode.png
new file mode 100644
index 0000000..fbd4ae9
Binary files /dev/null and b/docs/fig/table-streaming/append-mode.png differ

diff --git a/docs/fig/table-streaming/query-groupBy-cnt.png b/docs/fig/table-streaming/query-groupBy-cnt.png
new file mode 100644
index 0000000..9870055
Binary files /dev/null and b/docs/fig/table-streaming/query-groupBy-cnt.png differ

diff --git a/docs/fig/table-streaming/query-groupBy-window-cnt.png b/docs/fig/table-streaming/query-groupBy-window-cnt.png
new file mode 100644
index 0000000..ab991e4
Binary files /dev/null and b/docs/fig/table-streaming/query-groupBy-window-cnt.png differ

diff --git a/docs/fig/table-streaming/redo-mode.png b/docs/fig/table-streaming/redo-mode.png
new file mode 100644
index 0000000..24b8ad0
Binary files /dev/null and b/docs/fig/table-streaming/redo-mode.png differ

diff --git a/docs/fig/table-streaming/stream-query-stream.png b/docs/fig/table-streaming/stream-query-stream.png
new file mode 100644
index 0000000..11a7d56
Binary files /dev/null and b/docs/fig/table-streaming/stream-query-stream.png differ

diff --git a/docs/fig/table-streaming/undo-redo-mode.png b/docs/fig/table-streaming/undo-redo-mode.png
new file mode 100644
index 0000000..1326d2e
Binary files /dev/null and b/docs/fig/table-streaming/undo-redo-mode.png differ

View raw message