kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jun Rao (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-228) Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0
Date Mon, 02 Jan 2012 18:24:30 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13178496#comment-13178496
] 

Jun Rao commented on KAFKA-228:
-------------------------------

2.1. The problem is that commitOffset is called by auto commit too. We don't want to add info
level logging for every auto commit.

2.2. Well, in general, rebalance is selective. Not every rebalance affects all topics. Suppose
that there are 2 topics X and Y. X is on broker 1,2 and 3 and Y is only on broker 2 and 3.
Consider a consumer group with 2 consumers c1 and c2 that consume both topics X and Y. Initially,
if all brokers are alive, after rebalance, the assignment can be something like, c1 <-
X (broker1, broker2), Y(broker2); c2<- X (broker3), Y(broker3). Suppose now broker1 is
down. After rebalance, the assignment can be something like, c1 <- X (broker2), Y(broker2);
c2<- X (broker3), Y(broker3). As you can see, topic Y doesn't really need to rebalance
since the partition assignment doesn't change. Our current code handles this by not clearing
the queue for topic Y in this case. The fetcher will still be restarted. It just picks up
from where it left.

3. First of all, it's not clear if the synchronization in the patch gives the exclusive access
that you want. This is mainly because the lock is temporarily released in the makeNext() call.
This allows other concurrent callers to sneak in. For example, it could be that makeNext()
call gets the next chunk in getNextDataChunk, but hasn't updated currentDataChunk yet. At
this moment, the fetcher queue is cleared and clearCurrentChunk is called. The latter saves
the current offset (which will be used to set the fetch offset after rebalance) and sets currentDataChunk
to null. After that, the makeNext() call continues in getNextDataChunk and sets currentDataChunk
to a non-null value. This chunk will be fetched again after rebalance and thus introduce duplicates.

Second, calling commitOffsets inside consumerIterator seems a bit complicated. I am wondering
if we can commit offsets outside of consumerIterator.
                
> Reduce duplicate messages served by the kafka consumer for uncompressed topics to 0
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-228
>                 URL: https://issues.apache.org/jira/browse/KAFKA-228
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.7
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>             Fix For: 0.7.1
>
>         Attachments: kafka-228.patch
>
>
> Kafka guarantees at-least once delivery of messages.The high level consumer provides
highly available partitioned consumption of data within the same consumer group. In the event
of broker failures or consumer failures within a group, the high level consumer rebalances
and redistributes the topic partitions evenly amongst the consumers in a group. With the current
design, during this rebalancing operation, Kafka introduces duplicates in the consumed data.

> This JIRA improves the rebalancing operation and the consumer iterator design to guarantee
0 duplicates while consuming uncompressed topics. There will be a small number of duplicates
while serving compressed data, but it will be bound by the compression batch size.  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message