flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: deduplication with streaming sql
Date Tue, 06 Feb 2018 15:01:58 GMT
Hi Henkka,

This should be fairly easy to implement in a ProcessFunction.
You're making a good call to worry about the number of timers. If you
register a timer multiple times on the same time, the timer is deduplicated
;-) and will only fire once for that time.
That's why the state retention time allows to set a min and max timer. With
that, you only have to set a timer every (max - min) interval. For example,
if you say, the application should keep the state at least for 12 hours but
the most for 14 hours, you only need to register a new timer every 2 hours.

Hope this helps,
Fabian

2018-02-06 15:47 GMT+01:00 Henri Heiskanen <henri.heiskanen@gmail.com>:

> Hi,
>
> Thanks.
>
> Doing this deduplication would be easy just by using vanilla flink api and
> state (check if this is a new key and then emit), but the issue has been
> automatic state cleanup. However, it looks like this streaming sql
> retention time implementation uses the process function and timer. I was a
> bit reluctant to use that because I was worried that the approach would be
> overkill with our volumes, but maybe it will work just fine. Can you help
> me a bit how to implement it efficiently?
>
> Basically we get estimated of 20M of distinct rows/key and roughly 300
> events per key during one day. What I would like to do is to clear the
> state for specific key if I have not seen such key for last 12 hours. I
> think its very close to example here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/stream/
> operators/process_function.html. Instead of emitting the data onTimer I
> would just clear the state. In the example each tuple will invoke
> registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we
> could get hundreds of events with the same key during few minutes, so would
> we then register hundreds of timer instances?
>
> Br,
> Henkka
>
> On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <fhueske@apache.org> wrote:
>
>> Hi Henri,
>>
>> thanks for reaching out and providing code and data to reproduce the
>> issue.
>>
>> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
>> result in a retraction stream.
>>
>> However, with the current implementation we internally need a retraction
>> stream if a state retention time is configured.
>> The reason lies in how state retention time is defined: the state
>> retention time will remove the state for a key if it hasn't been seen for x
>> time.
>> This means that an operator resets a state clean-up timer of a key
>> whenever a new record with that key is received. This is also true for
>> retraction / insertion messages of the same record.
>> If we implement the GroupBy that performs the DISTINCT as an operator
>> that emits an append stream, all downstream operator won't see any updates
>> because the GroupBy only emits the first and filters out all duplicates.
>> Hence, downstream operators would perform a clean-up too early.
>>
>> I see that these are internals that users should not need to worry about,
>> but right now there is no easy solution to this.
>> Eventually, the clean-up timer reset should be differently implemented
>> than using retraction and insert of the same record. However, this would be
>> a more involved change and requires good planning.
>>
>> I'll file a JIRA for that.
>>
>> Thanks again for bringing the issue to our attention.
>>
>> Best, Fabian
>>
>>
>> 2018-02-06 13:59 GMT+01:00 Timo Walther <twalthr@apache.org>:
>>
>>> Hi Henri,
>>>
>>> I just noticed that I had a tiny mistake in my little test program. So
>>> SELECT DISTINCT is officially supported. But the question if this is a
>>> valid append stream is still up for discussion. I will loop in Fabian (in
>>> CC).
>>>
>>> For the general behavior you can also look into the code and especially
>>> the comments there [1].
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://github.com/apache/flink/blob/master/flink-libraries/
>>> flink-table/src/main/scala/org/apache/flink/table/runtime/
>>> aggregate/GroupAggProcessFunction.scala
>>>
>>>
>>> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>>>
>>> Hi Henri,
>>>
>>> I try to answer your question:
>>>
>>> 1) You are right, SELECT DISTINCT should not need a retract stream.
>>> Internally, this is translated into an aggregation without an aggregate
>>> function call. So this definitely needs improvement.
>>>
>>> 2) The problem is that SELECT DISTINCT is not officially supported nor
>>> tested. I opened an issue for this [1].
>>>
>>> Until this issue is fixed I would recommend to implement a custom
>>> aggregate function that keeps track values seen so far [2].
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-8564
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/table/udfs.html#aggregation-functions
>>>
>>>
>>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>>>
>>> Hi,
>>>
>>> I have a use case where I would like to find distinct rows over certain
>>> period of time. Requirement is that new row is emitted asap. Otherwise the
>>> requirement is mainly to just filter out data to have smaller dataset for
>>> downstream. I noticed that SELECT DISTINCT and state retention time of 12
>>> hours would in theory do the trick. You can find the code below. Few
>>> questions.
>>>
>>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios
>>> we would get update/delete rows?
>>>
>>> 2) If I run the below code with the example data (also below) without
>>> state retention config I get the two append rows (expected). If I run
>>> exactly the code below (with the retention config) I'll get two appends and
>>> one delete for AN1234 and then one append for AN5555. What is going on?
>>>
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>>> onment(env);
>>>
>>> StreamQueryConfig qConfig = tableEnv.queryConfig();
>>> // set idle state retention time. min = max = 12 hours
>>> qConfig.withIdleStateRetentionTime(Time.hours(12));
>>>
>>> // create a TableSource
>>> CsvTableSource csvSource = CsvTableSource
>>> .builder()
>>> .path("data.csv")
>>> .field("ts", Types.SQL_TIMESTAMP())
>>> .field("aid1", Types.STRING())
>>> .field("aid2", Types.STRING())
>>> .field("advertiser_id", Types.STRING())
>>> .field("platform_id", Types.STRING())
>>> .fieldDelimiter(",")
>>> .build();
>>>
>>> tableEnv.registerTableSource("CsvTable", csvSource);
>>>
>>> Table result = tableEnv.sqlQuery(
>>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>>>
>>> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
>>> String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
>>> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
>>> Types.STRING()});
>>>
>>> result.writeToSink(out, qConfig);
>>>
>>> env.execute();
>>>
>>>
>>> Here is a simple csv dataset of three rows:
>>>
>>> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1
>>> 234,1234567890
>>> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1
>>> 234,1234567890
>>> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1
>>> 234,1234567891
>>>
>>>
>>>
>>>
>>
>

Mime
View raw message