flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dipl.-Inf. Rico Bergmann" <i...@ricobergmann.de>
Subject Duplicates in Flink
Date Tue, 01 Sep 2015 12:37:48 GMT
Hi!

I still have an issue... I was now using 0.9.1 and the new 
KafkaConnector. But I still get duplicates in my flink prog. Here's the 
relevant part:

         final FlinkKafkaConsumer082<String> kafkaSrc = new 
FlinkKafkaConsumer082<String>(
             kafkaTopicIn, new SimpleStringSchema(), properties);

         DataStream<String> start = env.addSource(kafkaSrc)
             .setParallelism(numReadPartitions); //numReadPartitions = 2

         DataStream<JSONObject> jsonized = start
             .flatMap(new ExtractAndFilterJSON());

         DataStream<Session> sessions = jsonized
             .partitionByHash(new KeySelector<JSONObject, String>() {
             /**
              * partition by session id
              */
             @Override
             public String getKey(JSONObject value)
                 throws Exception {
                 try {
                 return /*session id*/;
                 } catch (Exception e) {
                 LOG.error("no session could be retrieved", e);
                 }
                 return "";
             }
             }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm 
sure that the kafka topic I'm reading from does not contain any 
duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.


Mime
View raw message