flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rico Bergmann <i...@ricobergmann.de>
Subject Re: Duplicates in Flink
Date Thu, 03 Sep 2015 12:31:19 GMT
The KafkaSink is the last step in my program after the 2nd deduplication. 

I could not yet track down where duplicates show up. That's a bit difficult to find out...
But I'm trying to find it...



> Am 03.09.2015 um 14:14 schrieb Stephan Ewen <sewen@apache.org>:
> 
> Can you tell us where the KafkaSink comes into play? At what point do the duplicates
come up?
> 
>> On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <info@ricobergmann.de> wrote:
>> No. I mean the KafkaSink. 
>> 
>> A bit more insight to my program: I read from a Kafka topic with flinkKafkaConsumer082,
then hashpartition the data, then I do a deduplication (does not eliminate all duplicates
though). Then some computation, afterwards again deduplication (group by message in a window
of last 2 seconds). 
>> 
>> Of course the last deduplication is not perfect.
>> 
>> Cheers. Rico. 
>> 
>> 
>> 
>>> Am 03.09.2015 um 13:29 schrieb Stephan Ewen <sewen@apache.org>:
>>> 
>>> Do you mean the KafkaSource?
>>> 
>>> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the KafkaSource?
>>> 
>>>> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <info@ricobergmann.de>
wrote:
>>>> Hi!
>>>> 
>>>> Testing it with the current 0.10 snapshot is not easily possible atm
>>>> 
>>>> But I deactivated checkpointing in my program and still get duplicates in
my output. So it seems not only to come from the checkpointing feature, or?
>>>> 
>>>> May be the KafkaSink is responsible for this? (Just my guess)
>>>> 
>>>> Cheers Rico. 
>>>> 
>>>> 
>>>> 
>>>>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljoscha@apache.org>:
>>>>> 
>>>>> Hi Rico,
>>>>> unfortunately the 0.9 branch still seems to have problems with exactly
once processing and checkpointed operators. We reworked how the checkpoints are handled for
the 0.10 release so it should work well there. 
>>>>> 
>>>>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the
problems persist there?
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>> 
>>>>>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <info@ricobergmann.de>
wrote:
>>>>>> Hi!
>>>>>> 
>>>>>> I still have an issue... I was now using 0.9.1 and the new
>>>>>> KafkaConnector. But I still get duplicates in my flink prog. Here's
the
>>>>>> relevant part:
>>>>>> 
>>>>>>          final FlinkKafkaConsumer082<String> kafkaSrc = new
>>>>>> FlinkKafkaConsumer082<String>(
>>>>>>              kafkaTopicIn, new SimpleStringSchema(), properties);
>>>>>> 
>>>>>>          DataStream<String> start = env.addSource(kafkaSrc)
>>>>>>              .setParallelism(numReadPartitions); //numReadPartitions
= 2
>>>>>> 
>>>>>>          DataStream<JSONObject> jsonized = start
>>>>>>              .flatMap(new ExtractAndFilterJSON());
>>>>>> 
>>>>>>          DataStream<Session> sessions = jsonized
>>>>>>              .partitionByHash(new KeySelector<JSONObject, String>()
{
>>>>>>              /**
>>>>>>               * partition by session id
>>>>>>               */
>>>>>>              @Override
>>>>>>              public String getKey(JSONObject value)
>>>>>>                  throws Exception {
>>>>>>                  try {
>>>>>>                  return /*session id*/;
>>>>>>                  } catch (Exception e) {
>>>>>>                  LOG.error("no session could be retrieved", e);
>>>>>>                  }
>>>>>>                  return "";
>>>>>>              }
>>>>>>              }).flatMap(new StatefulSearchSessionizer());
>>>>>> 
>>>>>> In the StatefulSearchSessionizer I receive duplicates sporadically.
I'm
>>>>>> sure that the kafka topic I'm reading from does not contain any
>>>>>> duplicates. So it must be in the flink program ...
>>>>>> 
>>>>>> Any ideas?
>>>>>> 
>>>>>> Cheers, Rico.
> 

Mime
View raw message