kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mirza Gaush Beg (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-2359) New consumer - partitions auto assigned only on poll
Date Fri, 17 Jun 2016 15:45:05 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336331#comment-15336331

Mirza Gaush Beg commented on KAFKA-2359:

Another use case: 
i want to retrieve data based on an offset range (starting from last committed offset to the
last message present at topic partition) using  'KafkaUtils.createRDD' for the same consumer
group.id.  I am using  high level consumer API from, then following below steps,
 2. Call 'subscribe'
 3. call 'partitionsFor'
4. call 'committed' - gives the last committed offset
5. call 'seekToEnd' - to retrieve the last offset  and this fails with "Caused by: java.lang.IllegalStateException:
No current assignment for partition test-topic-0". exception is seen with other two methods
'seek' and 'seekToBeginning'

> New consumer - partitions auto assigned only on poll
> ----------------------------------------------------
>                 Key: KAFKA-2359
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2359
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions:
>            Reporter: Stevo Slavic
>            Priority: Minor
> In the new consumer I encountered unexpected behavior. After constructing {{KafkaConsumer}}
instance with configured consumer rebalance callback handler, and subscribing to a topic with
"consumer.subscribe(topic)", retrieving subscriptions would return empty set and callback
handler would not get called (no partitions ever assigned or revoked), no matter how long
instance was up.
> Then I found by inspecting {{KafkaConsumer}} code that partition assignment will only
be triggered on first {{poll}}, since {{pollOnce}} has:
> {noformat}
> // ensure we have partitions assigned if we expect to
> if (subscriptions.partitionsAutoAssigned())
>     coordinator.ensurePartitionAssignment();
> {noformat}
> I'm proposing to fix this by including same {{ensurePartitionAssignment}} fragment in
{{KafkaConsumer.subscriptions}} accessor as well.

This message was sent by Atlassian JIRA

View raw message