flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Henri Heiskanen <henri.heiska...@gmail.com>
Subject Re: deduplication with streaming sql
Date Tue, 06 Feb 2018 14:47:20 GMT
Hi,

Thanks.

Doing this deduplication would be easy just by using vanilla flink api and
state (check if this is a new key and then emit), but the issue has been
automatic state cleanup. However, it looks like this streaming sql
retention time implementation uses the process function and timer. I was a
bit reluctant to use that because I was worried that the approach would be
overkill with our volumes, but maybe it will work just fine. Can you help
me a bit how to implement it efficiently?

Basically we get estimated of 20M of distinct rows/key and roughly 300
events per key during one day. What I would like to do is to clear the
state for specific key if I have not seen such key for last 12 hours. I
think its very close to example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html.
Instead of emitting the data onTimer I would just clear the state. In the
example each tuple will invoke registerEventTimeTimer(). Is this the
correct pattern? E.g. in our case we could get hundreds of events with the
same key during few minutes, so would we then register hundreds of timer
instances?

Br,
Henkka

On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <fhueske@apache.org> wrote:

> Hi Henri,
>
> thanks for reaching out and providing code and data to reproduce the issue.
>
> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
> result in a retraction stream.
>
> However, with the current implementation we internally need a retraction
> stream if a state retention time is configured.
> The reason lies in how state retention time is defined: the state
> retention time will remove the state for a key if it hasn't been seen for x
> time.
> This means that an operator resets a state clean-up timer of a key
> whenever a new record with that key is received. This is also true for
> retraction / insertion messages of the same record.
> If we implement the GroupBy that performs the DISTINCT as an operator that
> emits an append stream, all downstream operator won't see any updates
> because the GroupBy only emits the first and filters out all duplicates.
> Hence, downstream operators would perform a clean-up too early.
>
> I see that these are internals that users should not need to worry about,
> but right now there is no easy solution to this.
> Eventually, the clean-up timer reset should be differently implemented
> than using retraction and insert of the same record. However, this would be
> a more involved change and requires good planning.
>
> I'll file a JIRA for that.
>
> Thanks again for bringing the issue to our attention.
>
> Best, Fabian
>
>
> 2018-02-06 13:59 GMT+01:00 Timo Walther <twalthr@apache.org>:
>
>> Hi Henri,
>>
>> I just noticed that I had a tiny mistake in my little test program. So
>> SELECT DISTINCT is officially supported. But the question if this is a
>> valid append stream is still up for discussion. I will loop in Fabian (in
>> CC).
>>
>> For the general behavior you can also look into the code and especially
>> the comments there [1].
>>
>> Regards,
>> Timo
>>
>> [1] https://github.com/apache/flink/blob/master/flink-libraries/
>> flink-table/src/main/scala/org/apache/flink/table/
>> runtime/aggregate/GroupAggProcessFunction.scala
>>
>>
>> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>>
>> Hi Henri,
>>
>> I try to answer your question:
>>
>> 1) You are right, SELECT DISTINCT should not need a retract stream.
>> Internally, this is translated into an aggregation without an aggregate
>> function call. So this definitely needs improvement.
>>
>> 2) The problem is that SELECT DISTINCT is not officially supported nor
>> tested. I opened an issue for this [1].
>>
>> Until this issue is fixed I would recommend to implement a custom
>> aggregate function that keeps track values seen so far [2].
>>
>> Regards,
>> Timo
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8564
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/udfs.html#aggregation-functions
>>
>>
>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>>
>> Hi,
>>
>> I have a use case where I would like to find distinct rows over certain
>> period of time. Requirement is that new row is emitted asap. Otherwise the
>> requirement is mainly to just filter out data to have smaller dataset for
>> downstream. I noticed that SELECT DISTINCT and state retention time of 12
>> hours would in theory do the trick. You can find the code below. Few
>> questions.
>>
>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios
>> we would get update/delete rows?
>>
>> 2) If I run the below code with the example data (also below) without
>> state retention config I get the two append rows (expected). If I run
>> exactly the code below (with the retention config) I'll get two appends and
>> one delete for AN1234 and then one append for AN5555. What is going on?
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>
>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>> onment(env);
>>
>> StreamQueryConfig qConfig = tableEnv.queryConfig();
>> // set idle state retention time. min = max = 12 hours
>> qConfig.withIdleStateRetentionTime(Time.hours(12));
>>
>> // create a TableSource
>> CsvTableSource csvSource = CsvTableSource
>> .builder()
>> .path("data.csv")
>> .field("ts", Types.SQL_TIMESTAMP())
>> .field("aid1", Types.STRING())
>> .field("aid2", Types.STRING())
>> .field("advertiser_id", Types.STRING())
>> .field("platform_id", Types.STRING())
>> .fieldDelimiter(",")
>> .build();
>>
>> tableEnv.registerTableSource("CsvTable", csvSource);
>>
>> Table result = tableEnv.sqlQuery(
>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>>
>> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
>> String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
>> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING()});
>>
>> result.writeToSink(out, qConfig);
>>
>> env.execute();
>>
>>
>> Here is a simple csv dataset of three rows:
>>
>> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-
>> 1234,1234567890
>> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-
>> 1234,1234567890
>> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-
>> 1234,1234567891
>>
>>
>>
>>
>

Mime
View raw message