kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Esoga Simmons (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled
Date Thu, 19 Jan 2017 23:37:26 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15830829#comment-15830829
] 

Esoga Simmons edited comment on KAFKA-3450 at 1/19/17 11:37 PM:
----------------------------------------------------------------

Building off of what [~turek@avast.com] said, we want the call to validate the cache to happen
out of line of the send event.  The producer has {code}KafkaProducer.partitionsFor(){code}
that uses {code}waitOnMetadata(){code}, which updates the topic cache.  We verified that if
we call that at an interval less than the max default, we can prevent the send event from
entering the condition where it does the external blocking call. We handle errors in the callback.


was (Author: esogs):
Building off of what [~turek@avast.com] said, we want the call to validate the cache to happen
out of line of the send event.  The producer has {code}KafkaProducer.partitionsFor(){code}
that uses {code}waitOnMetadata(){code}, which updates the topic cache.  We verified that if
we call that at an interval less than the default 5 minutes, we can prevent the send event
from entering the condition where it does the external blocking call. We handle errors in
the callback.

> Producer blocks on send to topic that doesn't exist if auto create is disabled
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-3450
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3450
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.9.0.1
>            Reporter: Michal Turek
>            Assignee: Jun Rao
>            Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if the destination
topic doesn't exist and if their automatic creation is disabled. Warning from NetworkClient
containing UNKNOWN_TOPIC_OR_PARTITION is logged every 100 ms in a loop until the 60 seconds
timeout expires, but the operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of topics
is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
>     private static final Logger LOGGER = LoggerFactory.getLogger(NoSuchTopicTest.class);
>     public static void main(String[] args) {
>         Properties properties = new Properties();
>         properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>         properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, NoSuchTopicTest.class.getSimpleName());
>         properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); // Default
is 60 seconds
>         try (Producer<String, String> producer = new KafkaProducer<>(properties,
new StringSerializer(), new StringSerializer())) {
>             LOGGER.info("Sending message");
>             producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", "key",
"value"), (metadata, exception) -> {
>                 if (exception != null) {
>                     LOGGER.error("Send failed: {}", exception.toString());
>                 } else {
>                     LOGGER.info("Send successful: {}-{}/{}", metadata.topic(), metadata.partition(),
metadata.offset());
>                 }
>             });
>             LOGGER.info("Sending message");
>             producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", "key",
"value"), (metadata, exception) -> {
>                 if (exception != null) {
>                     LOGGER.error("Send failed: {}", exception.toString());
>                 } else {
>                     LOGGER.info("Send successful: {}-{}/{}", metadata.topic(), metadata.partition(),
metadata.offset());
>                 }
>             });
>         }
>     }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Sending message
(NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 0 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 1 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 2 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 3 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 4 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 5 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.433 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 6 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.534 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 7 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.635 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 8 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.736 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 9 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION}
(NetworkClient.java:582)
> 2016-03-23 12:44:38.772 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Send failed:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 35 ms. (NoSuchTopicTest.java:29)
> 2016-03-23 12:44:38.773 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Sending message
(NoSuchTopicTest.java:35)
> 2016-03-23 12:44:38.837 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 10 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.938 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 11 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.039 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 12 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.140 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 13 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.242 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 14 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.345 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 15 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.447 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 16 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.549 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 17 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.651 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 18 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.752 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread
| NoSuchTopicTest]: Error while fetching metadata with correlation id 19 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.774 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Send failed:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 21 ms. (NoSuchTopicTest.java:38)
> 2016-03-23 12:44:39.774 INFO  o.a.k.c.producer.KafkaProducer      [main]: Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms. (KafkaProducer.java:613)
> {noformat}
> Known workaround
> - Configure {{max.block.ms = 0}} in producer to prevent blocking and return from send()
immediately. But be careful, I'm not sure if is it safe and can't cause something even worse
;-)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message