kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove throwing exception if not found from describe topics (#6112)
Date Thu, 10 Jan 2019 18:03:19 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c238af2  MINOR: Remove throwing exception if not found from describe topics (#6112)
c238af2 is described below

commit c238af29bf50cade5aa10c1bb2678ad01cfbbf47
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Thu Jan 10 13:03:11 2019 -0500

    MINOR: Remove throwing exception if not found from describe topics (#6112)
    
    We recently improved the handling of the InternalTopicManager retries with #6085. The
AdminClient will throw an InvalidTopicException if the topic is not found. We need to ignore
that exception as when calling AdminClient#describe we may not have had a chance to create
the topic yet, especially with the case of internal topics
    
    I've created a new test asserting that when an InvalidTopicException is thrown when the
topic is not found we continue on.
    
    Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../processor/internals/InternalTopicManager.java  |  7 +++--
 .../internals/InternalTopicManagerTest.java        | 31 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 40c25d1..ee7fd3e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -203,9 +204,11 @@ public class InternalTopicManager {
             } catch (final ExecutionException couldNotDescribeTopicException) {
                 final Throwable cause = couldNotDescribeTopicException.getCause();
                 if (cause instanceof UnknownTopicOrPartitionException ||
-                    cause instanceof LeaderNotAvailableException) {
+                    cause instanceof LeaderNotAvailableException ||
+                    (cause instanceof InvalidTopicException &&
+                        cause.getMessage().equals("Topic " + topicName + " not found.")))
{
                     // This topic didn't exist or leader is not known yet, proceed to try
to create it
-                    log.debug("Topic {} is unknown, hence not existed yet.", topicName);
+                    log.debug("Topic {} is unknown or not found, hence not existed yet.",
topicName);
                 } else {
                     log.error("Unexpected error during topic description for {}.\n" +
                         "Error message was: {}", topicName, cause.toString());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index e91bf32..074228a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -193,6 +194,36 @@ public class InternalTopicManagerTest {
     }
 
     @Test
+    public void shouldLogWhenTopicNotFoundAndNotThrowException() {
+        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        mockAdminClient.addTopic(
+            false,
+            topic,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic,
Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final InternalTopicConfig internalTopicConfigII = new RepartitionTopicConfig("internal-topic",
Collections.emptyMap());
+        internalTopicConfigII.setNumberOfPartitions(1);
+
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(topic, internalTopicConfig);
+        topicConfigMap.put("internal-topic", internalTopicConfigII);
+
+
+        internalTopicManager.makeReady(topicConfigMap);
+        boolean foundExpectedMessage = false;
+        for (final String message : appender.getMessages()) {
+            foundExpectedMessage |= message.contains("Topic internal-topic is unknown or
not found, hence not existed yet.");
+        }
+        assertTrue(foundExpectedMessage);
+
+    }
+
+    @Test
     public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
         mockAdminClient.addTopic(
             false,


Mime
View raw message