kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Neha Narkhede (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-228) Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0
Date Tue, 03 Jan 2012 00:44:21 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13178585#comment-13178585

Neha Narkhede commented on KAFKA-228:

2.1 Fair enough. I'll move it out of commitOffsets() API

>> Well, in general, rebalance is selective
2.2. There is one case of rebalancing (which you've listed), where not clearing the queue
would help. However, the code gets slightly more complicated. But, I will make the changes
and include it in the next patch.

>> First of all, it's not clear if the synchronization in the patch gives the exclusive
access that you want. This is mainly because the lock is temporarily released in the makeNext()
call. This allows other concurrent callers to sneak in. For example, it could be that makeNext()
call gets the next chunk in getNextDataChunk, but hasn't updated currentDataChunk yet. At
this moment, the fetcher queue is cleared and clearCurrentChunk is called. The latter saves
the current offset (which will be used to set the fetch offset after rebalance) and sets currentDataChunk
to null. After that, the makeNext() call continues in getNextDataChunk and sets currentDataChunk
to a non-null value. This chunk will be fetched again after rebalance and thus introduce duplicates.

3. The purpose of adding the locking is to do our best to safely reduce the number of duplicates
served by the consumer iterator. Locking needs to be done in such a way that the lock is always
released before entering a potentially blocking operation. The case you've pointed out seems
very corner case, at least from the test runs (using KAFKA-227). For example, over hundreds
of iterations of that test, no duplicates were reported. If you try to "fix" this case, you
will risk a potential deadlock situation, which we must avoid. Given that, this amount of
locking seems reasonable to me.

>> Second, calling commitOffsets inside consumerIterator seems a bit complicated. I
am wondering if we can commit offsets outside of consumerIterator.
3. That protects against the duplication of the last data chunk. The best place to move this,
out of the consumer iterator, is in ZookeeperConsumerConnector's closeFetchers(), after clearing
after clearing the queue and clearing the current iterator. If this change is made, the number
of times we write to zookeeper will also reduce

In our offline chat, you mentioned you want to try to refactor the patch to simplify the consumer
iterator code, by removing the lock altogether, and only depend on the atomic references.
I was wondering if you'd like to give that a try and upload another patch ?

> Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0
> -----------------------------------------------------------------------------------
>                 Key: KAFKA-228
>                 URL: https://issues.apache.org/jira/browse/KAFKA-228
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.7
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>             Fix For: 0.7.1
>         Attachments: kafka-228.patch
> Kafka guarantees at-least once delivery of messages.The high level consumer provides
highly available partitioned consumption of data within the same consumer group. In the event
of broker failures or consumer failures within a group, the high level consumer rebalances
and redistributes the topic partitions evenly amongst the consumers in a group. With the current
design, during this rebalancing operation, Kafka introduces duplicates in the consumed data.

> This JIRA improves the rebalancing operation and the consumer iterator design to guarantee
0 duplicates while consuming uncompressed topics. There will be a small number of duplicates
while serving compressed data, but it will be bound by the compression batch size.  

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira


View raw message