flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rico Bergmann <i...@ricobergmann.de>
Subject Re: Duplicates in Flink
Date Thu, 03 Sep 2015 12:09:45 GMT
No. I mean the KafkaSink. 

A bit more insight to my program: I read from a Kafka topic with flinkKafkaConsumer082, then
hashpartition the data, then I do a deduplication (does not eliminate all duplicates though).
Then some computation, afterwards again deduplication (group by message in a window of last
2 seconds). 

Of course the last deduplication is not perfect.

Cheers. Rico. 



> Am 03.09.2015 um 13:29 schrieb Stephan Ewen <sewen@apache.org>:
> 
> Do you mean the KafkaSource?
> 
> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the KafkaSource?
> 
>> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <info@ricobergmann.de> wrote:
>> Hi!
>> 
>> Testing it with the current 0.10 snapshot is not easily possible atm
>> 
>> But I deactivated checkpointing in my program and still get duplicates in my output.
So it seems not only to come from the checkpointing feature, or?
>> 
>> May be the KafkaSink is responsible for this? (Just my guess)
>> 
>> Cheers Rico. 
>> 
>> 
>> 
>>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljoscha@apache.org>:
>>> 
>>> Hi Rico,
>>> unfortunately the 0.9 branch still seems to have problems with exactly once processing
and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release
so it should work well there. 
>>> 
>>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems
persist there?
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <info@ricobergmann.de>
wrote:
>>>> Hi!
>>>> 
>>>> I still have an issue... I was now using 0.9.1 and the new
>>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>>>> relevant part:
>>>> 
>>>>          final FlinkKafkaConsumer082<String> kafkaSrc = new
>>>> FlinkKafkaConsumer082<String>(
>>>>              kafkaTopicIn, new SimpleStringSchema(), properties);
>>>> 
>>>>          DataStream<String> start = env.addSource(kafkaSrc)
>>>>              .setParallelism(numReadPartitions); //numReadPartitions = 2
>>>> 
>>>>          DataStream<JSONObject> jsonized = start
>>>>              .flatMap(new ExtractAndFilterJSON());
>>>> 
>>>>          DataStream<Session> sessions = jsonized
>>>>              .partitionByHash(new KeySelector<JSONObject, String>()
{
>>>>              /**
>>>>               * partition by session id
>>>>               */
>>>>              @Override
>>>>              public String getKey(JSONObject value)
>>>>                  throws Exception {
>>>>                  try {
>>>>                  return /*session id*/;
>>>>                  } catch (Exception e) {
>>>>                  LOG.error("no session could be retrieved", e);
>>>>                  }
>>>>                  return "";
>>>>              }
>>>>              }).flatMap(new StatefulSearchSessionizer());
>>>> 
>>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>>>> sure that the kafka topic I'm reading from does not contain any
>>>> duplicates. So it must be in the flink program ...
>>>> 
>>>> Any ideas?
>>>> 
>>>> Cheers, Rico.
> 

Mime
View raw message