kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Edoardo Comar (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned
Date Wed, 25 May 2016 17:33:12 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15300482#comment-15300482
] 

Edoardo Comar commented on KAFKA-3727:
--------------------------------------

Hi can please anyone comment on this (IMHO) buggy  behavior ?
it also happens with other consumer API calls, e.g.
{code}
consumer.position(assigned-tp-that-does-not-exist);   //blocks here forever
{code}
e.g. with any method that calls
`org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Set<TopicPartition>)`

as it ends up in listOffset(..) where the UNKNOWN_TOPIC_OR_PARTITION error in the future is
considered retriable and so the while loops never ends

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-3727
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3727
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly different/inconsistent

> between a consumer that subscribed to the topic and one that had the topic-partition
manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer<String, String> assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer<String, String> subsKc = new KafkaConsumer<>(props2);
>             List<TopicPartition> tps = new ArrayList<>();
>             tps.add(new TopicPartition("topic-not-exists", 0));
>             assignKc.assign(tps);
>             subsKc.subscribe(Arrays.asList("topic-not-exists"));
>             System.out.println("********* subscribe k consumer ");
>             ConsumerRecords<String, String> crs2 = subsKc.poll(1000L); 
>             print("subscribeKc", crs2); // returns empty
>             System.out.println("********* assign k consumer ");
>             ConsumerRecords<String, String> crs1 = assignKc.poll(1000L); 
>            // will loop forever ! 
>             print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to Cluster(nodes =
[192.168.10.18:9093 (id: 0 rack: null)], partitions = []) (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for fetching
offset, wait for metadata refresh (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request {topics=[topic-not-exists]}
to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation id 9 :
{topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message