flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-8409) Race condition in KafkaConsumerThread leads to potential NPE
Date Tue, 06 Feb 2018 19:08:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354373#comment-16354373
] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8409 at 2/6/18 7:07 PM:
--------------------------------------------------------------------

Merged.

1.5 - 3655200799929409352945a3f4fce0f3e987b9ad
 1.4 - 08163009e443d00379696f9f84b3ccb0af6a25b6


was (Author: tzulitai):
Merged.

1.4 - 3655200799929409352945a3f4fce0f3e987b9ad
1.5 - 08163009e443d00379696f9f84b3ccb0af6a25b6

> Race condition in KafkaConsumerThread leads to potential NPE
> ------------------------------------------------------------
>
>                 Key: FLINK-8409
>                 URL: https://issues.apache.org/jira/browse/FLINK-8409
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0, 1.3.2, 1.5.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}} suggests
a race condition with the asynchronous callback from committing offsets to Kafka:
> {code}
> // record the work to be committed by the main consumer thread and make sure the consumer
notices that
> if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
>     log.warn("Committing offsets to Kafka takes longer than the checkpoint interval.
" +
>         "Skipping commit of previous offsets because newer complete checkpoint offsets
are available. " +
>         "This does not compromise Flink's checkpoint integrity.");
> }
> this.offsetCommitCallback = commitCallback;
> {code}
> In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be checked if there
are any offsets to commit. If so, an asynchronous offset commit operation will be performed.
The NPE happens in the case when the commit completes, but {{this.offsetCommitCallback = commitCallback;}}
is not yet reached.
> A possible fix is to make setting the next offsets to commit along with the callback
instance a single atomic operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message