flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Fink: KafkaProducer Data Loss
Date Fri, 03 Feb 2017 06:06:09 GMT
Hi Ninad and Till,

Thank you for looking into the issue! This is actually a bug.

Till’s suggestion is correct:
The producer holds a `pendingRecords` value that is incremented on each invoke() and decremented
on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the
`pendingRecords == 0` and `asyncException == null` (currently, we’re only checking `pendingRecords`).

A quick fix for this is to check and rethrow async exceptions in the `snapshotState` method
both before and after flushing and `pendingRecords` becomes 0.
I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5701.


On February 3, 2017 at 6:05:23 AM, Till Rohrmann (trohrmann@apache.org) wrote:

Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might go under certain
circumstances unnoticed. So for example you have a write operation which fails this will set
the asyncException field which is not checked before the next invoke call happens. If now
a checkpoint operation happens, it will pass and mark all messages up to this point as being
successfully processed. Only after the checkpoint, the producer will fail. And this constitutes
a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar with the Kafka
producer. Maybe they can answer your and my questions.


On Thu, Feb 2, 2017 at 9:58 PM, ninad <nninad@gmail.com> wrote:
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
'retry' mechanism doesn't kick in until a message is added to it's internal

If there's an exception before that, KafkaProducer will throw that
exception, and seems like Flink isn't handling that. In this case there will
be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception
e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " +
e.getMessage(), e);
        else {
            callback = new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception
exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;

Here are the scenario's we've identified that will cause data loss:

All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries
to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
configured timeout, it throws an exception.
-Memory records not writable (Existing bug in kafka library)

In both the above cases, KafkaProducer won't retry, and Flink will ignore
the messages. the messages aren't even logged. The exception is, but not the
messages which failed.

Possible workarounds (Kafka settings):

A very high value for metadata timeout (metadata.fetch.timeout.ms)
A very high value for buffer expiry (request.timeout.ms)
We're still investigating the possible side effects of changing the above
kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data
loss by modifying some Flink settings?


View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

View raw message