flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rico Bergmann <i...@ricobergmann.de>
Subject Re: Duplicates in Flink
Date Thu, 03 Sep 2015 11:10:49 GMT
Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my output. So it
seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljoscha@apache.org>:
> 
> Hi Rico,
> unfortunately the 0.9 branch still seems to have problems with exactly once processing
and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release
so it should work well there. 
> 
> Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems persist
there?
> 
> Cheers,
> Aljoscha
> 
>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <info@ricobergmann.de>
wrote:
>> Hi!
>> 
>> I still have an issue... I was now using 0.9.1 and the new
>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>> relevant part:
>> 
>>          final FlinkKafkaConsumer082<String> kafkaSrc = new
>> FlinkKafkaConsumer082<String>(
>>              kafkaTopicIn, new SimpleStringSchema(), properties);
>> 
>>          DataStream<String> start = env.addSource(kafkaSrc)
>>              .setParallelism(numReadPartitions); //numReadPartitions = 2
>> 
>>          DataStream<JSONObject> jsonized = start
>>              .flatMap(new ExtractAndFilterJSON());
>> 
>>          DataStream<Session> sessions = jsonized
>>              .partitionByHash(new KeySelector<JSONObject, String>() {
>>              /**
>>               * partition by session id
>>               */
>>              @Override
>>              public String getKey(JSONObject value)
>>                  throws Exception {
>>                  try {
>>                  return /*session id*/;
>>                  } catch (Exception e) {
>>                  LOG.error("no session could be retrieved", e);
>>                  }
>>                  return "";
>>              }
>>              }).flatMap(new StatefulSearchSessionizer());
>> 
>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>> sure that the kafka topic I'm reading from does not contain any
>> duplicates. So it must be in the flink program ...
>> 
>> Any ideas?
>> 
>> Cheers, Rico.

Mime
View raw message