flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-5701) FlinkKafkaPrdocuer violates at-least-once by not handling failed records
Date Fri, 03 Feb 2017 05:47:51 GMT
Tzu-Li (Gordon) Tai created FLINK-5701:
------------------------------------------

             Summary: FlinkKafkaPrdocuer violates at-least-once by not handling failed records
                 Key: FLINK-5701
                 URL: https://issues.apache.org/jira/browse/FLINK-5701
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector, Streaming Connectors
            Reporter: Tzu-Li (Gordon) Tai


Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html

The problem:

The producer holds a {{pendingRecords}} value that is incremented on each invoke() and decremented
on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the
{{pendingRecords == 0}} and {{asyncException == null}} (currently, we’re only checking {{pendingRecords}}).

Generally, to fix this, we need to handle exceptions in the callback and re-add the original
record back into the producer. I think the {{onComplete}} method is called after the KafkaProducer
internally finishes all retry attempts and is removed from the buffer, so if we don’t do
anything with the exception other than just logging it, the message will be lost.

Two additional things we need to address in order to solve this:
1. {{FlinkKafkaProducer}} needs to keep a map of callback to their corresponding original
record.

2. We need to determine what async exceptions to actually re-add to the FlinkKafkaProducer.
We simply cannot re-add for every exception, otherwise  errors that simply cannot be resolved
by retrying will hang the checkpoint flush process forever, and it'll be unclear to the user
why the checkpoint is taking so long. The ElasticsearchSink has similar issues (FLINK-5353
and FLINK-5122). The proposed approach for this, instead of determining which async exceptions
to retry case by case, is to let the user provide async failure handlers and let them implement
logic on which exceptions to handle / re-add.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message