flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Weird Kafka Connector issue
Date Wed, 25 Apr 2018 06:59:26 GMT

Just to clarify your observation here:

Is the problem the fact that map operators after the “source: travel” Kafka topic source
do not receive all records from the source?
This does seem weird, but as of now I don’t really have ideas yet of how this could maybe
be Flink related.

One other thing to be sure of - have you verified that the outputs of your test are incorrect?
Or are you assuming that it is incorrect based on the weird metric numbers shown on the web


On 25 April 2018 at 6:13:07 AM, TechnoMage (mlatta@technomage.com) wrote:

Still trying to figure out what is happening with this kafka topic.

1) No exceptions in the task manager log or the UI exceptions tab.
2) Topic partitions being reset to offset 0 confirmed in log.
3) Other topics in this and other tests show full consumption of messages (all JSON format
4) The source shows more records output than are received by 2 of the 3 following stages.

The diagram for the job is below, as is the GUI showing tasks and record counts.

On Apr 23, 2018, at 11:23 AM, TechnoMage <mlatta@technomage.com> wrote:

I have been using the kafka connector sucessfully for a while now.  But, am getting weird
results in one case.

I have a test that submits 3 streams to kafka topics, and monitors them on a separate process.
 The flink job has a source for each topic, and one such is fed to 3 separate map functions
that lead to other operators.  This topic only shows 6097 out of 30000 published, and the
map functions following the source only show a fraction of that as received.  The consumer
is configured to start at the begining and in other cases the same code receives all messages
published.  The parallelism is 6 if that makes a difference, as is the partitioning on the

The code for creating the topic is below.

Any suggestions on why it is missing so many messages would be welcome.


      String topic = a.kafkaTopicName(ets);
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);

View raw message