flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hequn Cheng <chenghe...@gmail.com>
Subject Re: 回复:Is it possible to handle late data when using table API?
Date Wed, 17 Apr 2019 02:09:17 GMT
Hi Lasse,

> some devices can deliver data days back in time and I would like to have
the results as fast as possible.

What JingsongLee said is correct.

However, it's possible to handle your problem with Table API according to
your description above. You can use the non-window(or unbounded)
aggregate[1].
The non-window aggregate supports early fire, i.e., output results
immediately once there is an update, so you can "have the results as fast
as possible". The query looks like:

 Table res30MinWindows = machineInsights
.select("UserActionTime / (30 * 60) as windowId, machineId,
machineInsightId, value")
.groupBy("windowId, machineId, machineInsightId")
.select("machineId, machineInsightId, windowId as wStart, windowId + 1800
as sEnd, value.max as max")

Only you have to notice is, as non-window aggregate keeps all (result)data
in its state, the required state to compute the query result might grow
infinitely depending on the type of aggregation and the number of distinct
grouping keys. To solve this problem, you can provide a query configuration
with a valid retention interval to prevent excessive state size[2].
In your case, I think the valid retention interval would be the max delay
interval of your data.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/tableApi.html#aggregations
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html


On Tue, Apr 16, 2019 at 5:38 PM Lasse Nedergaard <lassenedergaard@gmail.com>
wrote:

> Hi
>
> Thanks for the fast reply. Unfortunately it not an option as some devices
> can deliver data days back in time and I would like to have the results as
> fast as possible.
> I have to convert my implementation to use streaming API instead.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 16. apr. 2019 kl. 11.08 skrev JingsongLee <lzljs3620320@aliyun.com>:
>
> Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
> But you can set rowtime.watermarks.delay of source to slow down the
> watermark clock.
>
> ------------------------------------------------------------------
> 发件人:Lasse Nedergaard <lassenedergaard@gmail.com>
> 发送时间:2019年4月16日(星期二) 16:20
> 收件人:user <user@flink.apache.org>
> 主 题:Is it possible to handle late data when using table API?
>
> Hi.
>
> I have a simple tumble window working on eventtime like this.
>
> Table res30MinWindows = machineInsights
>         .window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // define window
>         .groupBy("machineId, machineInsightId, w") // group by key and window
>         .select("machineId, machineInsightId, w.start, w.end, w.rowtime, value.max as
max"); // access window properties and aggregate
>
> As we work with Iot units we don't have 100% control over the eventtime reported and
therefore need to handle late data to ensure that we don't do our calculation wrong.
>
> I would like to know if there is any option in the Table API to get access to late data,
or my only option is to use Streaming API?
>
> Thanks in advance
>
> Lasse Nedergaard
>
>
>
>

Mime
View raw message