flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-6747] [docs] Add documentation for QueryConfig.
Date Mon, 10 Jul 2017 14:02:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master fb48dc2fd -> ccf10002b


[FLINK-6747] [docs] Add documentation for QueryConfig.

This closes #4256.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf10002
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf10002
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf10002

Branch: refs/heads/master
Commit: ccf10002b83310347380bbf3694e736d4d6533c8
Parents: fb48dc2
Author: Fabian Hueske <fhueske@apache.org>
Authored: Tue Jul 4 23:28:22 2017 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Jul 10 16:02:03 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/streaming.md | 108 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 104 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ccf10002/docs/dev/table/streaming.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index c7f070b..1677dad 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -351,13 +351,113 @@ val windowedTable = tEnv
 Query Configuration
 -------------------
 
-In stream processing, compuations are constantly happening and there are many use cases that
require to update previously emitted results. There are many ways in which a query can compute
and emit updates. These do not affect the semantics of the query but might lead to approximated
results. 
+Table API and SQL queries have the same semantics regardless whether their input is bounded
batch input or unbounded stream input. In many cases, continuous queries on streaming input
are capable of computing accurate results that are identical to offline computed results.
However, this is not possible in general case because continuous queries have to restrict
the size of the state they are maintaining in order to avoid to run out of storage and to
be able to process unbounded streaming data over a long period of time. As a result, a continuous
query might only be able to provide approximated results depending on the characteristics
of the input data and the query itself.
 
-Flink's Table API and SQL interface use a `QueryConfig` to control the computation and emission
of results and updates.
+Flink's Table API and SQL interface provide parameters to tune the accuracy and resource
consumption of continuous queries. The parameters are specified via a `QueryConfig` object.
The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table`
is translated, i.e., when it is [transformed into a DataStream](common.html#convert-a-table-into-a-datastream-or-dataset)
or [emitted via a TableSink](common.html#emit-a-table).
 
-### State Retention
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// obtain query configuration from TableEnvironment
+StreamQueryConfig qConfig = tableEnv.queryConfig();
+// set query parameters
+qConfig.withIdleStateRetentionTime(Time.hours(12));
+
+// define query
+Table result = ...
+
+// create TableSink
+TableSink<Row> sink = ...
+
+// emit result Table via a TableSink
+result.writeToSink(sink, qConfig);
+
+// convert result Table into a DataStream<Row>
+DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// obtain query configuration from TableEnvironment
+val qConfig: StreamQueryConfig = tableEnv.queryConfig
+// set query parameters
+qConfig.withIdleStateRetentionTime(Time.hours(12))
+
+// define query
+val result: Table = ???
+
+// create TableSink
+val sink: TableSink[Row] = ???
+
+// emit result Table via a TableSink
+result.writeToSink(sink, qConfig)
+
+// convert result Table into a DataStream[Row]
+val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
+
+{% endhighlight %}
+</div>
+</div>
+
+In the the following we describe the parameters of the `QueryConfig` and how they affect
the accuracy and resource consumption of a query.
+
+### Idle State Retention Time
+
+Many queries aggregate or join records on one or more key attributes. When such a query is
executed on a stream, the continuous query needs to collect records or maintain partial results
per key. If the key domain of the input stream is evolving, i.e., the active key values are
changing over time, the continuous query accumulates more and more state as more and more
distinct keys are observed. However, often keys become inactive after some time and their
corresponding state becomes stale and useless.
+
+For example the following query computes the number of clicks per session.
+
+```
+SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+```
+
+The `sessionId` attribute is used as a grouping key and the continuous query maintains a
count for each `sessionId` it observes. The `sessionId` attribute is evolving over time and
`sessionId` values are only active until the session ends, i.e., for a limited period of time.
However, the continuous query cannot know about this property of `sessionId` and expects that
every `sessionId` value can occur at any point of time. It maintains a count for each observed
`sessionId` value. Consequently, the total state size of the query is continuously growing
as more and more `sessionId` values are observed. 
+
+The *Idle State Retention Time* parameters define for how long the state of a key is retained
without being updated before it is removed. For the previous example query, the count of a
`sessionId` would be removed as soon as it has not been updated for the configured period
of time.
+
+By removing the state of a key, the continuous query completely forgets that it has seen
this key before. If a record with a key, whose state has been removed before, is processed,
the record will be treated as if it was the first record with the respective key. For the
example above this means that the count of a `sessionId` would start again at `0`.
+
+There are two parameters to configure the idle state retention time:
+- The *minimum idle state retention time* defines how long the state of an inactive key is
at least kept before it is removed.
+- The *maximum idle state retention time* defines how long the state of an inactive key is
at most kept before it is removed.
+
+The parameters are specified as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+StreamQueryConfig qConfig = ...
+
+// set idle state retention time: min = 12 hour, max = 16 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16));
+// set idle state retention time. min = max = 12 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val qConfig: StreamQueryConfig = ???
+
+// set idle state retention time: min = 12 hour, max = 16 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16))
+// set idle state retention time. min = max = 12 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12)
+
+{% endhighlight %}
+</div>
+</div>
 
-**TO BE DONE**
+Configuring different minimum and maximum idle state retention times is more efficient because
it reduces the internal book-keeping of a query for when to remove state.
 
 {% top %}
 


Mime
View raw message