flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kumar Bolar, Harshith" <hk...@arity.com>
Subject FlinkKafkaProducer010: why is checkErroneous() at the beginning of the invoke() method
Date Fri, 12 Apr 2019 09:00:34 GMT
Hi all,

I had a requirement to handle Kafka producer exceptions so that they don’t bring down the
job. I extended FlinkKafkaProducer010 and handled the exceptions there.

public void invoke(T value, Context context) throws Exception {
              try {
this.checkErroneous();
                     ...
this.producer.send(record, this.callback);
              } catch (Exception exception) {
                     // Handle exception
              }
       }
The problem with this is, because checkErroneous() is at the beginning of the invoke() method,
the catch block is getting triggered for the next message – not for the message that is
causing the exception. So, I moved checkErroneous() below producer.send() like so –

       public void invoke(T value, Context context) throws Exception {
              try {
                     ...
                     this.producer.send(record, this.callback);
                     this.checkErroneous();
              } catch (Exception exception) {
                     // Handle exception
              }
       }

This solved the issue, the exceptions are now being thrown for the message that’s causing
the error instead of the next message.

Is there a specific reason why checkErroneous() is on top? Or am I doing something wrong?
Class: https://github.com/apache/flink/blob/19d20e5cf8d44d726b4a44575e6c8db677e4c3c8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java

Regards,
Harshith
Mime
View raw message