kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randall Hauch (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5731) Connect WorkerSinkTask out of order offset commit can lead to inconsistent state
Date Tue, 22 Aug 2017 23:37:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137610#comment-16137610
] 

Randall Hauch commented on KAFKA-5731:
--------------------------------------

[~hachikuji] or [~ewencp]: I've reopened the issue so this can be backported to the {{0.10.2}}
branch, and added [this pull request|https://github.com/apache/kafka/pull/3717] for that branch.
Thanks!

> Connect WorkerSinkTask out of order offset commit can lead to inconsistent state
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-5731
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5731
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Jason Gustafson
>            Assignee: Randall Hauch
>             Fix For: 0.11.0.1
>
>
> In Connect's WorkerSinkTask, we do sequence number validation to ensure that offset commits
are handled in the right order (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199).

> Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field is overridden
regardless of this sequence check as long as the response had no error (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284):
> {code:java}
>             OffsetCommitCallback cb = new OffsetCommitCallback() {
>                 @Override
>                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception error) {
>                     if (error == null) {
>                         lastCommittedOffsets = offsets;
>                     }
>                     onCommitCompleted(error, seqno);
>                 }
>             };
> {code}
> Hence if we get an out of order commit, then the internal state will be inconsistent.
To fix this, we should only override {{lastCommittedOffsets}} after sequence validation as
part of the {{onCommitCompleted(...)}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message