flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-5701) FlinkKafkaPrdocuer should check asyncException on checkpoints
Date Fri, 03 Feb 2017 05:57:51 GMT

     [ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Tzu-Li (Gordon) Tai updated FLINK-5701:
---------------------------------------
    Description: 
Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html

The problem:

The producer holds a {{pendingRecords}} value that is incremented on each invoke() and decremented
on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the
{{pendingRecords == 0}} and {{asyncException == null}} (currently, we’re only checking {{pendingRecords}}).

A quick fix for this is to check and rethrow async exceptions in the {{snapshotState}} method
both before and after flushing and {{pendingRecords}} becomes 0.

  was:
Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html

The problem:

The producer holds a {{pendingRecords}} value that is incremented on each invoke() and decremented
on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the
{{pendingRecords == 0}} and {{asyncException == null}} (currently, we’re only checking {{pendingRecords}}).

A quick fix for this is to check and rethrow async exceptions in the `snapshotState` method
both before and after flushing and `pendingRecords` becomes 0.


> FlinkKafkaPrdocuer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>
> Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each invoke() and
decremented on each callback, used to check if the producer needs to sync on pending callbacks
on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff after flushing
the {{pendingRecords == 0}} and {{asyncException == null}} (currently, we’re only checking
{{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the {{snapshotState}}
method both before and after flushing and {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message