kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jason Gustafson (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic
Date Thu, 19 Oct 2017 21:22:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211760#comment-16211760
] 

Jason Gustafson commented on KAFKA-6014:
----------------------------------------

Yeah, I agree with [~gilles.degols]. I'm not sure {{InvalidTopicException}} is quite right
though since it is used for improperly formatted topics. Probably we should just propagate
a new {{UnknownTopicOrPartitionException}}.

> new consumer mirror maker halts after committing offsets to a deleted topic
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6014
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6014
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Onur Karaman
>
> New consumer throws an unexpected KafkaException when trying to commit to a topic that
has been deleted. MirrorMaker.commitOffsets doesn't attempt to catch the KafkaException and
just kills the process. We didn't see this in the old consumer because old consumer just silently
drops failed offset commits.
> I ran a quick experiment locally to prove the behavior. The experiment:
> 1. start up a single broker
> 2. create a single-partition topic t
> 3. create a new consumer that consumes topic t
> 4. make the consumer commit every few seconds
> 5. delete topic t
> 6. expect: KafkaException that kills the process.
> Here's my script:
> {code}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Collections;
> import java.util.List;
> import java.util.Properties;
> public class OffsetCommitTopicDeletionTest {
>     public static void main(String[] args) throws InterruptedException {
>         Properties props = new Properties();
>         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9090");
>         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
>         props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>         KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(props);
>         TopicPartition partition = new TopicPartition("t", 0);
>         List<TopicPartition> partitions = Collections.singletonList(partition);
>         kafkaConsumer.assign(partitions);
>         while (true) {
>             kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(0,
"")));
>             Thread.sleep(1000);
>         }
>     }
> }
> {code}
> Here are the other commands:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t --partitions
1 --replication-factor 1
> > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
> {code}
> Here is the output:
> {code}
> [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] Offset commit
failed on partition t-0 at offset 0: This server does not host this topic-partition. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Partition t-0 may
not exist or user may not have Describe access to topic
>   at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
>   at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
>   at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>   at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>   at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>   at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>   at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>   at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
>   at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>   at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
>   at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
>   at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
>   at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
>   at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231)
>   at org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22)
> {code}
> A couple ways we could fix this:
> 1. make OffsetCommitResponseHandler throw a more specific exception and make MirrorMaker.commitOffsets
catch the exception. It currently just catches WakeupException and CommitFailedException.
> 2. make OffsetCommitResponseHandler log the error and move on. This is probably the simpler
option. Just delete lines:
> {code}
> -                        future.raise(new KafkaException("Partition " + tp + " may not
exist or user may not have Describe access to topic"));
> -                        return;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message