kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vahid Hashemian (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming
Date Mon, 06 Mar 2017 22:18:33 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15898258#comment-15898258
] 

Vahid Hashemian commented on KAFKA-4845:
----------------------------------------

[~DanC], The issue you raised sounds very similar to the one reported in [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547],
that was fixed in 0.10.2.0.
Also, the seconds code snippet and the comments you added there apply to before [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]
was [fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0].
The function {{updateFetchPositions(Set<TopicPartition> partitions)}} currently looks
like this:
{code}
        fetcher.resetOffsetsIfNeeded(partitions);
        if (!subscriptions.hasAllFetchPositions(partitions)) {
            coordinator.refreshCommittedOffsetsIfNeeded();
            fetcher.updateFetchPositions(partitions);
        }
{code}

So it sounds like you are not running the latest KafkaConsumer code. The issue you raised
applies to 0.10.1.0 and 0.10.1.1 (a duplicate of [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]),
but not to 0.10.2.0.

Please advise if I'm misunderstood the defect or am missing something. Thanks.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-4845
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4845
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>            Reporter: Dan
>            Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest offsets except
for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
>     val c = consumer
>     c.poll(0)
>     val parts = c.assignment().asScala
>     val newPartitions = parts.diff(currentOffsets.keySet)
>     currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
>     c.pause(newPartitions.asJava)
>     c.seekToEnd(currentOffsets.keySet.asJava)
>     parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set<TopicPartition> partitions):
> {code}
>         fetcher.resetOffsetsIfNeeded(partitions);    // reset to latest offset for current
partition
>         if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for all partitions
before, so this sentence will be true 
>             coordinator.refreshCommittedOffsetsIfNeeded();
>             fetcher.updateFetchPositions(partitions);  // reset to committed offsets
for current partition
>         }
> {code}
> So eventually there is only one partition(the last partition in assignment) can get latest
offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message