kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4547) Consumer.position returns incorrect results for Kafka 0.10.1.0 client
Date Tue, 24 Jan 2017 20:40:26 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15836603#comment-15836603
] 

ASF GitHub Bot commented on KAFKA-4547:
---------------------------------------

GitHub user vahidhashemian opened a pull request:

    https://github.com/apache/kafka/pull/2431

    KAFKA-4547 (0.10.1 hotfix): Avoid unnecessary offset commit that could lead to an invalid
offset position if partition is paused

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vahidhashemian/kafka KAFKA-4547-0.10.1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2431
    
----
commit 836287820dd86c91dc422b30406aa20696996aa7
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Date:   2017-01-24T20:16:02Z

    KAFKA-4547 (0.10.1 hotfix): Avoid unnecessary offset commit that could lead to an invalid
offset position if partition is paused

----


> Consumer.position returns incorrect results for Kafka 0.10.1.0 client
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-4547
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4547
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.1.0, 0.10.0.2, 0.10.1.1
>         Environment: Windows Kafka 0.10.1.0
>            Reporter: Pranav Nakhe
>            Assignee: Vahid Hashemian
>            Priority: Blocker
>              Labels: clients
>             Fix For: 0.10.2.0
>
>         Attachments: issuerep.zip
>
>
> Consider the following code -
> 		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
> 		List<TopicPartition> listOfPartitions = new ArrayList();
> 		for (int i = 0; i < consumer.partitionsFor("IssueTopic").size(); i++) {
> 			listOfPartitions.add(new TopicPartition("IssueTopic", i));
> 		}
> 		consumer.assign(listOfPartitions);		
> 		consumer.pause(listOfPartitions);
> 		consumer.seekToEnd(listOfPartitions);
> //		consumer.resume(listOfPartitions); -- commented out
> 		for(int i = 0; i < listOfPartitions.size(); i++) {
> 			System.out.println(consumer.position(listOfPartitions.get(i)));
> 		}
> 		
> I have created a topic IssueTopic with 3 partitions with a single replica on my single
node kafka installation (0.10.1.0)
> The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 0.10.0.1
> A) Initially when there are no messages on IssueTopic running the above program returns
> 0.10.1.0                   
> 0                              
> 0                              
> 0           
> 0.10.0.1
> 0
> 0
> 0
> B) Next I send 6 messages and see that the messages have been evenly distributed across
the three partitions. Running the above program now returns 
> 0.10.1.0                   
> 0                              
> 0                              
> 2                              
> 0.10.0.1
> 2
> 2
> 2
> Clearly there is a difference in behavior for the 2 clients.
> Now after seekToEnd call if I make a call to resume (uncomment the resume call in code
above) then the behavior is
> 0.10.1.0                   
> 2                              
> 2                              
> 2                              
> 0.10.0.1
> 2
> 2
> 2
> This is an issue I came across when using the spark kafka integration for 0.10. When
I use kafka 0.10.1.0 I started seeing this issue. I had raised a pull request to resolve that
issue [SPARK-18779] but when looking at the kafka client implementation/documentation now
it seems the issue is with kafka and not with spark. There does not seem to be any documentation
which specifies/implies that we need to call resume after seekToEnd for position to return
the correct value. Also there is a clear difference in the behavior in the two kafka client
implementations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message