kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6418) AdminClient should handle empty or null topic names better
Date Tue, 30 Jan 2018 22:48:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345941#comment-16345941
] 

ASF GitHub Bot commented on KAFKA-6418:
---------------------------------------

hachikuji closed pull request #4470: KAFKA-6418: AdminClient should handle empty or null topic
names better
URL: https://github.com/apache/kafka/pull/4470
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6d6788dab55..36cbe6cad00 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1077,19 +1077,33 @@ void call(Call call, long now) {
         }
     }
 
+    /**
+     * Returns true if a topic name cannot be represented in an RPC.  This function does
NOT check
+     * whether the name is too long, contains invalid characters, etc.  It is better to enforce
+     * those policies on the server, so that they can be changed in the future if needed.
+     */
+    private static boolean topicNameIsUnrepresentable(String topicName) {
+        return (topicName == null) || topicName.isEmpty();
+    }
+
     @Override
     public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                            final CreateTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
         final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
         for (NewTopic newTopic : newTopics) {
-            if (topicFutures.get(newTopic.name()) == null) {
+            if (topicNameIsUnrepresentable(newTopic.name())) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The given topic name
'" +
+                    newTopic.name() + "' cannot be represented in a request."));
+                topicFutures.put(newTopic.name(), future);
+            } else if (!topicFutures.containsKey(newTopic.name())) {
                 topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
                 topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
             }
         }
         final long now = time.milliseconds();
-        runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
+        Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
             new ControllerNodeProvider()) {
 
             @Override
@@ -1128,7 +1142,10 @@ public void handleResponse(AbstractResponse abstractResponse) {
             void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
-        }, now);
+        };
+        if (!topicsMap.isEmpty()) {
+            runnable.call(call, now);
+        }
         return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
     }
 
@@ -1137,12 +1154,17 @@ public DeleteTopicsResult deleteTopics(final Collection<String>
topicNames,
                                            DeleteTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
         for (String topicName : topicNames) {
-            if (topicFutures.get(topicName) == null) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The given topic name
'" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
                 topicFutures.put(topicName, new KafkaFutureImpl<Void>());
             }
         }
         final long now = time.milliseconds();
-        runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
+        Call call = new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
             new ControllerNodeProvider()) {
 
             @Override
@@ -1181,7 +1203,10 @@ void handleResponse(AbstractResponse abstractResponse) {
             void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
-        }, now);
+        };
+        if (!topicNames.isEmpty()) {
+            runnable.call(call, now);
+        }
         return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
     }
 
@@ -1223,13 +1248,18 @@ public DescribeTopicsResult describeTopics(final Collection<String>
topicNames,
         final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new
HashMap<>(topicNames.size());
         final ArrayList<String> topicNamesList = new ArrayList<>();
         for (String topicName : topicNames) {
-            if (!topicFutures.containsKey(topicName)) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<TopicDescription>();
+                future.completeExceptionally(new InvalidTopicException("The given topic name
'" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
                 topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
                 topicNamesList.add(topicName);
             }
         }
         final long now = time.milliseconds();
-        runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
+        Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
             new ControllerNodeProvider()) {
 
             private boolean supportsDisablingTopicCreation = true;
@@ -1298,7 +1328,10 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException
exception)
             void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
-        }, now);
+        };
+        if (!topicNamesList.isEmpty()) {
+            runnable.call(call, now);
+        }
         return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 84588a9f3be..186ccf06cb5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -64,6 +64,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -215,6 +216,38 @@ public void testCreateTopics() throws Exception {
         }
     }
 
+    @Test
+    public void testInvalidTopicNames() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            List<String> sillyTopicNames = Arrays.asList(new String[] {"", null});
+            Map<String, KafkaFuture<Void>> deleteFutures =
+                env.adminClient().deleteTopics(sillyTopicNames).values();
+            for (String sillyTopicName : sillyTopicNames) {
+                assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
+            }
+
+            Map<String, KafkaFuture<TopicDescription>> describeFutures =
+                    env.adminClient().describeTopics(sillyTopicNames).values();
+            for (String sillyTopicName : sillyTopicNames) {
+                assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
+            }
+
+            List<NewTopic> newTopics = new ArrayList<>();
+            for (String sillyTopicName : sillyTopicNames) {
+                newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
+            }
+            Map<String, KafkaFuture<Void>> createFutures =
+                env.adminClient().createTopics(newTopics).values();
+            for (String sillyTopicName : sillyTopicNames) {
+                assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
+            }
+        }
+    }
+
     private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC,
"mytopic3"),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
     private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC,
"mytopic4"),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> AdminClient should handle empty or null topic names better
> ----------------------------------------------------------
>
>                 Key: KAFKA-6418
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6418
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>            Reporter: dan norwood
>            Priority: Minor
>             Fix For: 1.1.0
>
>
> if you try to `createTopics(Collections.singleton(new NewTopic()));` you will get something
like the following:
> {noformat}
> [2018-01-02 11:20:46,481] ERROR [kafka-admin-client-thread | adminclient-3] Uncaught
exception in thread 'kafka-admin-client-thread | adminclient-3': (org.apache.kafka.common.utils.KafkaThread)
> org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field
'create_topic_requests': Error computing size for field 'topic': Missing value for field 'topic'
which has no default value.
> 	at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:94)
> 	at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:341)
> 	at org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)
> 	at org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:98)
> 	at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:91)
> 	at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:423)
> 	at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:397)
> 	at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:358)
> 	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:810)
> 	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1002)
> 	at java.lang.Thread.run(Thread.java:745)
> [2018-01-02 11:20:46,481] ERROR [kafka-admin-client-thread | adminclient-3] Uncaught
exception in thread 'kafka-admin-client-thread | adminclient-3': (org.apache.kafka.common.utils.KafkaThread)
> org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field
'create_topic_requests': Error computing size for field 'topic': Missing value for field 'topic'
which has no default value.
> 	at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:94)
> 	at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:341)
> 	at org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)
> 	at org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:98)
> 	at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:91)
> 	at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:423)
> 	at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:397)
> 	at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:358)
> 	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:810)
> 	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1002)
> 	at java.lang.Thread.run(Thread.java:745)
> [2018-01-02 11:21:01,383] ERROR [qtp1875757262-59] Unhandled exception resulting in internal
server error response (io.confluent.rest.exceptions.GenericExceptionMapper)
> java.util.concurrent.TimeoutException
> 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
> 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
> 	at io.confluent.controlcenter.data.KafkaDao.createTopics(KafkaDao.java:85)
> 	at io.confluent.controlcenter.rest.KafkaResource.createTopic(KafkaResource.java:87)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> {noformat}
> the actual error prints immediately, but the adminclient still waits for a timeout and
then exposes a TimeoutException to the user
> Note that no other elements of the batch request are performed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message